/*
 * Copyright (c) 2020 Huawei Technologies Co.,Ltd.
 *
 * openGauss is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * memctl.cpp
 *     Routines related to memory quota for queries.
 *
 * IDENTIFICATION
 *     src/gausskernel/cbb/workload/memctl.cpp
 *
 * -------------------------------------------------------------------------
 *
 * INTERFACE ROUTINES
 *      CalculateQueryMemMain            main entry of memory control logic
 *      MethodPlanMemCalculateWalker        plan tree walker function that give memory
 *                                                             for each plan node
 *      CalculateGroupMemory              calculate memory amount for each operator group
 *      DecreaseMemPerGroup              memory decrease logic entry for each group
 *      AdjustMemOpConsumption         adjust the memory for each nodes to avoid disk
 *                                                      spill in case of inaccurate estimate
 *
 * NOTES
 *
 *      The memory control logic of this file is used to assign memory usage of each
 *      operator in the plan tree, and ganrantee the total memory usage of the whole
 *      plan within a threshold. The idea is based on pipeline executor module, and it
 *      considers concurrent memory usage of a query. The rule is applicable to all the
 *      executor engine with pipeline module.
 *
 *
 *      DETAILS:
 *      ------
 *      1. In optimizer phase, the plan is generated by the op work mem which is related
 *          to query available memory, and the memory usage of each memory intensive
 *          operator is already record in each plan node. All of these is the prerequisite of
 *          this logic.
 *      2. The main procedure of memory control logic includes (1) dividing the plan tree
 *          to different operating groups, and in each group memory can be calculated
 *          and processed separately. (2) calculating the total memory usage of all the
 *          groups to figure out total memory usage of the query plan. (3) In case that
 *          the memory used by the plan exceeds the threshold, the routine provides
 *          the interface to reduce memory usage in each group (operator) to meet
 *          the overall memory usage standard. (4) Doing some minor adjustment to
 *          provide unproper memory allocation in some extreme cases, like inaccurate
 *          estimation, high intial memory usage of some nodes, etc.
 *      3. As to the division rule of plan tree into group, we mainly consider the following
 *          principle: (1) The kinds of group should not be too many to avoid high
 *          overhead of maintainness. (2) Different behavior of nodes in executor engine
 *          should be differentiate by different group. Therefore, four special groups is
 *          used, including: three different joins, and append. And we put other nodes into
 *          other common group. (NOTE: If we add new operator nodes, the group
 *          classification may need to change.) For join, different joins has different build
 *          and probe logic to consider, so besides join group, build and probe side of the
 *          join are other two groups. For append(merge append), all the branches will be
 *          executed one by one, and the memory can be reused. For other group, all the
 *          operators can coexists in the same time. Also, we should consider stream operator,
 *          because once query starts, all the operators below stream will be executed
 *          simultaneously, so each operator under stream will be treated as a new group.
 *          In the end, since we only consider memory intensive operators, so we can combine
 *          memory non-intensive operators to the same group. The group's structure likes
 *          the plan tree, that each group has 0-more children, and each children group only
 *          has one parent group.
 *          For example (like the following plan):
 *
 *                                                         Sort(1)
 *                                                            |
 *                                                       Hashagg(2)
 *                                                            |
 *                                                       Hashjoin(3)
 *                                                       /           \
 *                                                Stream(4)  Stream(6)
 *                                                     |              |
 *                                                  Scan(5)   Hashjoin(7)
 *                                                                 /          \
 *                                                            Scan(8)   PartIterator(9)
 *                                                                                   |
 *                                                                               Scan(10)
 *
 *          Based on the above rule, we have the following group tree (the number in bracket
 *          is operator node number in above tree):
 *
 *                                                        Group0(1)
 *                                                              |
 *                                                        Group1(2)
 *                                                              |
 *                                                        Group2(3)
 *                                                         /         \
 *                                                  Group3(4)  Group5(6)
 *                                                        |               |
 *                                                  Group4(5)  Group6(7)
 *                                                                    /        \
 *                                                           Group7(8)   Group8(9,10)
 *
 *      4. For each group, we have three memory consumption quota: (1) below stream operator
 *          mem. This will record the sum of below stream sub group mem. Like Group0 in
 *          above group tree, the below stream memory quota should include the sum memory
 *          of Group5 and Group7. (2) group blocked op mem. This will record memory usage
 *          of the current group when starting to return tuples to above nodes. That is, current
 *          group's blocked op mem will coexist with above group. Above group's blocked op mem
 *          also refer to this quota. (3) group concurrent mem. This will record the max mem
 *          used when group top node is involved in execution. For three joins, the concurrent
 *          mem calculation is different. And group mem is the maximum of the three. With
 *          recursive mem calculation on each group, we can figure out the memory usage of
 *          the whole query. (top group)
 *      5. Then if the total memory usage exceeds the threshold, we need to decrease mem in
 *          each group to meet the threshold. For each group, we need to decrease three quota
 *          separately. This involves the recursive calculation of memory deduction of child group,
 *          and the memory intensive operator within this group.
 *      6. Then we do the final adjustment, includes: (1) make all memory intensive operator's
 *          memory usage at least 16MB, to avoid huge initial memory allocation (which is not
 *          counted in estimation). (2) if the final memory usage is close to non-disk-spill status
 *          or full-disk-spill status, just enlarge the memory usage in case of possible underestimation.
 *          (3) Assign 1MB to other memory non-intensive operator. (4) Since we may have large
 *          underestimation, if optimizer thinks it'll not spill to disk, we can set mem auto spread
 *          flag to allow dynamic memory allocation auto spread if not enough memory available,
 *          but the system memory resouces are free.
 *
 */

#include "postgres.h"
#include "knl/knl_variable.h"

#include "miscadmin.h"
#include "optimizer/cost.h"
#include "optimizer/planmem_walker.h"
#include "optimizer/randomplan.h"
#include "pgxc/pgxc.h"
#include "workload/workload.h"
#include "utils/memprot.h"
#include "utils/selfuncs.h"

#ifdef ENABLE_UT
#define static
#endif

/* when traversing child group from pos 0, right tree is pos 1 */
#define RIGHTTREE_POS 1

/*
 * Memory decrease amount if momory exceeds available mem, and decrease
 * amount is based on proportion of group/operator mem in total mem
 */
#define MEM_DECREASE(groupMem, totalDMem) ((int)((double)(groupMem) / totalOpMem * (totalDMem)))

/*
 * Since we estimate memory consumption accordint to result tuple count, huge
 * overestimation will do harm to memory usage estimation, so should do some
 * adjustment
 */

/* when the level beyonds 3, it's assume not accurate */
#define MAX_ACCURATE_LEVEL 3

/* reasonable memory usage limit 2GB when bad estimation */
#define MAX_REASONABLE_MEM (2 * 1024L * 1024L)

/* get half of amount, that is, 0.5 */
#define HALF_AMOUNT 0.5

/*
 * We think group by columns between 2 and 5 is in accurate,
 * because we don't have correlation stats on multi columns,
 * but with more columns, agg groups tend to be large
 */
#define MIN_AGG_COL_NUM_INACCURATE 2
#define MAX_AGG_COL_NUM_INACCURATE 5
#define AGG_INACCURATE(plan, node) \
    ((plan)->numCols >= MIN_AGG_COL_NUM_INACCURATE && (plan)->numCols <= MAX_AGG_COL_NUM_INACCURATE)

/* This macro is used to add two integer to an integer without overflow */
#define INT_ADD(a, b) Min((int64)(a) + (b), INT_MAX)

#define MEMCTL_LOG(format, ...)                                                                           \
    do {                                                                                                  \
        if (is_errmodule_enable(DEBUG2, MOD_MEM)) {                                                       \
            ereport(DEBUG2, (errmodule(MOD_MEM), errmsg(format, ##__VA_ARGS__), ignore_interrupt(true))); \
        }                                                                                                 \
    } while (0)

/* the basic structure for item to do comparison */
typedef struct OpMemItem {
    int opMem;  /* memory usage of the item */
    int id;     /* index id to identify the original pos */
    Plan* plan; /* plan node of the item */
    Cost cost;  /* calculated regression cost of the node */
} OpMemItem;

/*
 * we have two kinds of memory comparison method:
 *	OPMEM_CMP: used to do comparison between different
 *			group. Since we don't know real regression cost,
 *			we just average the decreased mem according to
 *			total mem, but get rid of nodes with very small mem
 *	COST_CMP: used to do comparison between plan nodes
 *			in the same group. In such cases, we will compare
 *			regression cost of plan nodes and first decrease
 *			the mem of nodes with less regress cost
 */
typedef enum { OPMEM_CMP = 0, COST_CMP } CmpMethod;

/* memory for non-intensive operator in internal calculation */
#define non_operator_memory (1 * 1024)  // 1MB

/* Declaration of static functions */

/* ---------------------------------------------------------- */
/*                          plan node judgement routines                                 */
/* ---------------------------------------------------------- */
static bool IsPlanNode(Node* node);
static bool IsMemoryIntensiveOperator(Node* node);
static bool IsAppendNode(Plan* node);
static bool IsRequiredHashJoin(Plan* plan, bool blocked);
static bool IsRequiredNestLoop(Plan* plan, bool blocked, bool* unBlockedMat = NULL);
static bool IsMemorySensitiveNode(Plan* plan);
static bool IsJoinRightTreeUnBlocked(Plan* plan);

/* ---------------------------------------------------------- */
/*                     group judgement/operation routines                            */
/* ---------------------------------------------------------- */
static bool IsTopNodeBlockedOp(OperatorGroupNode* group);
static void CalChildLevelForGroup(OperatorGroupNode* group);
static OperatorGroupNode* CreateOperatorGroupNode(int groupId, Plan* node, OperatorGroupNode* parentGroup, int ng_num);
static OperatorGroupNode* GetParentOperatorGroup(OperatorGroupNode* groupNode);
static OperatorGroupNode* CreateOrFindOperatorGroupForOperator(Node* node, MethodPlanWalkerContext* context);
static void PrintOperatorNode(OperatorGroupNode* groupNode, List* ng_distributionList);
static void PrintEarlyFreeContext(MethodPlanWalkerContext* ctx);

/* ---------------------------------------------------------- */
/*                                plan traverse routines                                      */
/* ---------------------------------------------------------- */
static bool MethodPlanMemCalculateWalker(Node* node, MethodPlanWalkerContext* context);

/* ---------------------------------------------------------- */
/*                group and plan memory calculation routines                      */
/* ---------------------------------------------------------- */
static int GetTopNodeMemKB(OperatorGroupNode* group, int ng_index, int* minMem = NULL);
static int GetBlockedOpMem(OperatorGroupNode* group, int ng_index, int* minMem = NULL, bool unBlockedMat = false);
static int GetGroupMemKB(OperatorGroupNode* group, int ng_index, int* minMem = NULL);
static int GetBelowStreamConcMemKB(OperatorGroupNode* group, int ng_index, int* minMem = NULL);
static void GroupAddMemory(int* baseArray, const int* memArray, bool check = true);
static int GetChildMaxMem(OperatorGroupNode* group);
static void GetStreamMemConsumption(Plan* node, MethodPlanWalkerContext* context);
static int CalcJoinConcMem(
    OperatorGroupNode* group, int* buildMaxMem, int* probeMaxMem, int* build0MaxMem, int ng_index, int* minMem = NULL);
static void CalculateGroupMemory(OperatorGroupNode* group, bool firstTime);
static void CalculateBlockedJoinGroupMemory(OperatorGroupNode* group, bool firstTime);
static void CalculateGroupMemFromChild(
    OperatorGroupNode* group, bool childGroupCoexist, bool isBlockedJoin, bool firstTime);

/* ---------------------------------------------------------- */
/*                        memory usage decrease routines                              */
/* ---------------------------------------------------------- */
static void SetMinimumDMem(OperatorGroupNode* group);
static void DecreaseMemPerGroup(OperatorGroupNode* group, QueryMemKB* ng_queryMemKBArray, bool decreaseMem);
static void AssignDMemToTopNode(OperatorGroupNode* group, int totalOpMem, int dMem, const char* funcname, int ng_index);
static void AssignDMemToBlockedOp(
    OperatorGroupNode* group, int totalOpMem, int dMem, const char* funcname, int ng_index, bool unBlockedMat = false);
static void AssignDMemToGroup(OperatorGroupNode* group, int totalOpMem, int dMem, const char* funcname, int ng_index);
static void AssignDecreaseMemToChild(OperatorGroupNode* group, int ng_index, bool decreaseMem);
static void DecreaseOpMemToNode(
    OperatorGroupNode* group, List* childTopOps, int opDMem, int ng_index, bool decreaseMem);
static void DecreaseOpMemToGroup(OperatorGroupNode* group, int groupOpDMem, int ng_index, bool decreaseMem);
static void DecreaseOpMem(OperatorGroupNode* group, int ng_index, bool decreaseMem);
static void AssignDConcMemForBuild(
    OperatorGroupNode* buildGroup, OperatorGroupNode* probeGroup, int joinDMem, int ng_index, bool decreaseMem);
static void AssignDConcMemForProbe(
    OperatorGroupNode* buildGroup, OperatorGroupNode* probeGroup, int joinDMem, int ng_index, bool decreaseMem);
static void AssignDConcMemForHashJoinBuild(OperatorGroupNode* group, int joinDMem, int ng_index, bool decreaseMem);
static void AssignDConcMemForHashJoinProbe(OperatorGroupNode* group, int joinDMem, int ng_index, bool decreaseMem);
static void AssignDecreasedJoinConcMem(OperatorGroupNode* group, int joinDMem, int buildMaxMem, int probeMaxMem,
    int build0MaxMem, int ng_index, bool decreaseMem);
static void AssignConcDecreaseMemToChild(OperatorGroupNode* group, int ng_index, bool decreaseMem);

/* ---------------------------------------------------------- */
/*               memory comparison and calculation routines                      */
/* ---------------------------------------------------------- */
static void AdjustMemForEstimationCrisis(MethodPlanWalkerContext* context, Plan* node, OpMemInfo* info);
static void AssignMemOpConsumption(MethodPlanWalkerContext* context, Plan* node);
static bool CompareOpMemFuzzy(double mem1, double mem2);
static int CalOpSpreadMem(Plan* node, MethodPlanWalkerContext* context);
static void AdjustMemOpConsumption(MethodPlanWalkerContext* context, Plan* node);
static int OpMemItemCmp(const void* a, const void* b, void* arg);
static void CalcDecreaseRegressCost(OpMemItem* item, int dMem);
static int ChooseDMemItems(OpMemItem* itemArray, int groupNum, int* totalOpMem, int DMem, bool decreaseMem);
void SetNgAssignedQueryMem(MethodPlanWalkerContext* cxt, int assigned_query_mem);
void ReleaseResource(MethodPlanWalkerContext* ctx);

static int GetHashaggInitializedMem(Agg* node);

extern List* PgxcGroupGetLogicClusterList(Bitmapset* nodeids);
extern char* get_pgxc_groupname(Oid groupoid, char* groupname = NULL);
THR_LOCAL Oid lc_replan_nodegroup = InvalidOid;

/* Function implementation */
/* ---------------------------------------------------------- */
/*                          plan node judgement routines                                 */
/* ---------------------------------------------------------- */
/*
 * IsPlanNode
 *	check if the node is a "subclass" of Plan
 *
 * Parameters:
 *	@in node: node of plan
 *
 * Returns: true if it's a plan node, else false
 */
static bool IsPlanNode(Node* node)
{
    if (node == NULL) {
        return false;
    }

    /* check plan node range of row engine and vec engine */
    return ((nodeTag(node) >= T_BaseResult && nodeTag(node) <= T_Stream) || nodeTag(node) == T_RemoteQuery ||
            (nodeTag(node) >= T_VecResult && nodeTag(node) <= T_VecRemoteQuery));
}

/*
 * IsMemoryIntensiveOperator
 *	check an operator whether is memory intensive
 *
 * Parameters:
 *	@in node: node of plan
 *
 * Returns: true if node is memory intensive, or false
 */
static bool IsMemoryIntensiveOperator(Node* node)
{
    switch (nodeTag(node)) {
        case T_Material:
        case T_VecMaterial:
        case T_Sort:
        case T_HashJoin:
        case T_VecHashJoin:
        case T_AsofJoin:
        case T_VecAsofJoin:
        case T_VecSort:
            return true;
        /* only hashagg setop is memory intensive */
        case T_SetOp:
        case T_VecSetOp: {
            SetOp* op = (SetOp*)node;
            if (op->strategy == SETOP_HASHED) {
                return true;
            }
        } break;
        /* only hashagg is memory intensive */
        case T_VecAgg:
        case T_Agg: {
            Agg* agg = (Agg*)node;
            if (agg->aggstrategy == AGG_HASHED) {
                return true;
            }
        } break;
        case T_ModifyTable:
        case T_VecModifyTable: {
            ModifyTable* mt = (ModifyTable*)node;
            if (mt->mem_info.maxMem > 0) {
                return true;
            }
        } break;
        case T_CStoreIndexScan:
        case T_CStoreIndexHeapScan: {
            Scan* scan = (Scan*)node;
            if (scan->mem_info.maxMem > 0) {
                return true;
            }
        } break;
        /*
         * groupagg with distinct clause and AP function is
         * memory intensive. windowagg with agg function is
         * memory intensive. Need to add later
         */
        default:
            break;
    }

    return false;
}

/*
 * isJoinNode
 *	check if the node is a append node
 *
 * Parameters:
 *	@in node: node of plan
 *
 * Returns: true if it's a append node, else false
 */
static bool IsAppendNode(Plan* node)
{
    return IsA(node, Append) || IsA(node, VecAppend) || IsA(node, MergeAppend);
}

/*
 * isRequiredHashJoin
 *	check an operator whether is required hashjoin
 *
 * Parameters:
 *	@in node: node of plan
 *	@in blocked: if we need blocked hashjoin. For hashjoin in
 *			row engine, in specific case, the probe side is first
 *			scanned then build side, this is not blocked. Or
 *			build side will block probe side
 *
 * Returns: true if node required hashjoin with blocked or not, or false
 */
static bool IsRequiredHashJoin(Plan* plan, bool blocked)
{
    if (IsA(plan, VecHashJoin)) {
        return true;
    }

    if (!IsA(plan, HashJoin)) {
        return false;
    }

    if (blocked) {
        HashJoin* hplan = (HashJoin*)plan;

        /* for row engine, left join or anti join is not blocked */
        if (hplan->join.jointype == JOIN_LEFT || hplan->join.jointype == JOIN_ANTI ||
            hplan->join.jointype == JOIN_LEFT_ANTI_FULL) {
            return false;
        }

        /* for row engine, if lefttree start up cost is less, then try to do probe first */
        if (!hplan->streamBothSides && plan->lefttree->startup_cost < plan->righttree->total_cost) {
            return false;
        }
    }

    return true;
}

/*
 * isRequiredNestLoop
 *	check an operator whether is required nestloop
 *
 * Parameters:
 *	@in node: node of plan
 *	@in blocked: if we need blocked nestloop. For nestloop, we have three
 *			kinds of executor mode. 1. right tree is materialized all, that is,
 *			first scan all right tree and build material, and then probe.
 *			2. right tree is materialize but not all, this case we first scan
 *			probe side, and for each probe row, we search build side, and
 *			put them in material at the same time, and for the next probe
 *			row, it can fetch inner row from material. 3. there's no
 *			materialize at build side, that is, for each fetch of inner rows,
 *			we should alwary do the execution. For case 2 and 3, it's not blocked.
 *	@out unBlockedMat: true for the case 1 of above comment
 *
 * Returns: true if node required nestloop with blocked or not, or false
 */
static bool IsRequiredNestLoop(Plan* plan, bool blocked, bool* unBlockedMat)
{
    if (!IsA(plan, NestLoop) && !IsA(plan, VecNestLoop)) {
        return false;
    }

    if (blocked) {
        NestLoop* nplan = (NestLoop*)plan;
        Material* righttree = (Material*)plan->righttree;

        /* Case 3 in the above comment */
        if (!IsA(righttree, Material) && !IsA(righttree, VecMaterial)) {
            return false;
        }

        /* Case 2 in the above comment */
        if (!nplan->materialAll) {
            if (unBlockedMat != NULL) {
                *unBlockedMat = true;
            }
            return false;
        }
    }

    return true;
}

/*
 * IsMemorySensitiveNode
 *	check if the node is memory sensitive node, that it, given more memory
 *	will greatly impact performance
 *
 * Parameters:
 *	@in node: node of plan
 *
 * Returns: true if node is memory sensible
 */
static bool IsMemorySensitiveNode(Plan* plan)
{
    /* batches of hashjoin is influenced by memory predication */
    if (IsA(plan, HashJoin) || (IsA(plan, VecHashJoin) && ((HashJoin*)plan)->isSonicHash)) {
        return true;
    }
    /* less data will spill to disk given more momery for hashagg or hashsetop */
    if (IsA(plan, Agg) || IsA(plan, VecAgg)) {
        return true;
    }
    if (IsA(plan, SetOp) || IsA(plan, VecSetOp)) {
        return true;
    }
    if (IsA(plan, ModifyTable) || IsA(plan, VecModifyTable)) {
        return true;
    }

    return false;
}

/*
 * IsBlockedJoinNode
 *	check if the node is blocked join node. mergejoin and blocked hashjoin,
 *	nestloop are treated as blocked join node.
 *
 * Parameters:
 *	@in node: node of plan
 *
 * Returns: true if join node is blocked
 */
bool IsBlockedJoinNode(Plan* node)
{
    return IsRequiredHashJoin(node, true) || IsA(node, MergeJoin) || IsA(node, VecMergeJoin) ||
           IsRequiredNestLoop(node, true);
}

/*
 * IsJoinRightTreeUnBlocked
 *	check if right tree of a join is unblocked (If the right tree node can
 *	output tuples before the lower plans finish). For materialize node of
 *	mergejoin right tree, or nestloop case 2 and 3, they are unblocked.
 *
 * Parameters:
 *	@in node: node of plan
 *
 * Returns: true if the right tree is unblocked
 */
static bool IsJoinRightTreeUnBlocked(Plan* plan)
{
    bool unBlockedOp = false;

    /* For materialize on right tree of mergejoin, should count all child blocked op */
    if ((IsA(plan, MergeJoin) && IsA(plan->righttree, Material)) ||
        (IsA(plan, VecMergeJoin) && IsA(plan->righttree, VecMaterial))) {
        unBlockedOp = true;
    /* For un-materialize-all case, we should count all child blocked op */
    } else if (IsRequiredNestLoop(plan, false)) {
        (void)IsRequiredNestLoop(plan, true, &unBlockedOp);
    }

    return unBlockedOp;
}

/* ---------------------------------------------------------- */
/*                     group judgement/operation routines     */
/* ---------------------------------------------------------- */

/*
 * IsTopNodeBlockedOp
 *	check if the top node of the group is blocked. For virtual group, it's
 *	definitely unblocked. For hashjoin, the tuple returns during probing
 *	the lefttree, so it's also unblocked.
 *
 * Parameters:
 *	@in group: input operator group node
 *
 * Returns: true if the top node is blocked, else false
 */
static bool IsTopNodeBlockedOp(OperatorGroupNode* group)
{
    if (group->virtualGroup || IsRequiredHashJoin(group->topNode, false)) {
        return false;
    } else {
        return true;
    }
}

/*
 * CalChildLevelForGroup
 *	calculate how many levels of children are there below this group.
 *	It's assumed that with more level, the estimation will become less
 *	accurate.
 *
 * Parameters:
 *	@in group: input operator group node
 *
 * Returns: void
 */
static void CalChildLevelForGroup(OperatorGroupNode* group)
{
#define CHILD_LEVEL(group) Max((group)->childLevel, (group)->outerLevel)
    ListCell* lc = NULL;
    int maxLevel = 0;

    /*
     * For hashjoin, the accuracy only depends on inner side, so only decide from inner side.
     * For merge join and nestloop, decide from two join side, else all children
     */
    if (IsRequiredHashJoin(group->topNode, true)) {
        group->outerLevel = CHILD_LEVEL((OperatorGroupNode*)linitial(group->childGroups)) + 1;
        maxLevel = CHILD_LEVEL((OperatorGroupNode*)lsecond(group->childGroups)) + 1;
    } else if (IsBlockedJoinNode(group->topNode)) {
        maxLevel = Max(CHILD_LEVEL((OperatorGroupNode*)linitial(group->childGroups)),
                       CHILD_LEVEL((OperatorGroupNode*)lsecond(group->childGroups))) +
                   1;
    } else if (list_length(group->childGroups) == 1) {
        OperatorGroupNode* childGroup = (OperatorGroupNode*)linitial(group->childGroups);
        maxLevel = CHILD_LEVEL(childGroup);
        if (!childGroup->underStreamGroup) {
            maxLevel++;
        }
    } else {
        foreach (lc, group->childGroups) {
            OperatorGroupNode* childGroup = (OperatorGroupNode*)lfirst(lc);
            maxLevel = Max(maxLevel, CHILD_LEVEL(childGroup) + 1);
        }
    }

    group->childLevel = maxLevel;
}

/*
 * CreateOperatorGroupNode
 *	create a new operator group with a specified id.
 *
 * Parameters:
 *	@in groupId: the new assigned id for this new group
 *	@in node: node of plan that is the top node of the new group
 *	@in parentGroup: parent group of this new group
 *	@in ng_num: num of nodegroup
 *
 * Returns: new create group
 */
static OperatorGroupNode* CreateOperatorGroupNode(int groupId, Plan* node, OperatorGroupNode* parentGroup, int ng_num)
{
    OperatorGroupNode* group = (OperatorGroupNode*)palloc0(sizeof(OperatorGroupNode));

    group->groupId = groupId;
    group->topNode = node;
    group->parentGroup = parentGroup;

    /* 0 is original data for not logic cluster case */
    group->ng_num = ng_num;
    group->ng_groupMemKBArray = (OperatorGroupNodeMem*)palloc0(ng_num * sizeof(OperatorGroupNodeMem));

    return group;
}

/*
 * GetParentOperatorGroup
 *	get parent operator group.
 *
 * Parameters:
 *	@in groupNode: input group
 *
 * Returns: intput group's parent group
 */
static OperatorGroupNode* GetParentOperatorGroup(OperatorGroupNode* groupNode)
{
    return groupNode->parentGroup;
}

/*
 * CreateOrFindOperatorGroupForOperator
 *	create(during the first tree traverse) or find(during other phase)
 *	an new operator group for a given operator node if the given
 *	operator node is a potential root of an operator group.
 *
 * Parameters:
 *	@in node: current traverse node
 *	@in context: plan tree traverse context
 *
 * Returns: new created or found group
 */
static OperatorGroupNode* CreateOrFindOperatorGroupForOperator(Node* node, MethodPlanWalkerContext* context)
{
    OperatorGroupNode* groupNode = context->groupNode;
    int ng_num = context->ng_num;

    /*
     * If the group tree has not been built, we create
     * the first operator group here.
     */
    if (context->nextGroupId == 0) {
        /* for the first traverse phase: assign mem, we create group, or find the already created one */
        if (context->phase == ASSIGN_MEM) {
            groupNode = CreateOperatorGroupNode(context->nextGroupId, (Plan*)node, NULL, ng_num);

            context->groupTree = groupNode;
        } else {
            groupNode = context->groupTree;
        }

        context->nextGroupId++;
    /*
     * If this node is a potential root of an operator group, this means that
     * the current group ends, and a new group starts. we create a new operator
     * group.
     */
    } else if (IsMemoryIntensiveOperator(node) || IsBlockedJoinNode((Plan*)node) || IsAppendNode((Plan*)node) ||
             ((context->status & (UNDER_STREAM | UNDER_MULTI_GROUP_OP)) != 0 && !IsA(node, Hash))) {
        Assert(groupNode != NULL);

        /* for the first traverse phase: assign mem, we create group, or find the already created one */
        if (context->phase == ASSIGN_MEM) {
            OperatorGroupNode* parentGroupNode = groupNode;

            groupNode = CreateOperatorGroupNode(context->nextGroupId, (Plan*)node, groupNode, ng_num);

            if (!IsMemoryIntensiveOperator(node)) {
                groupNode->virtualGroup = true;
            }
            if ((context->status & UNDER_STREAM) != 0) {
                groupNode->underStreamGroup = true;
            }

            parentGroupNode->childGroups = lappend(parentGroupNode->childGroups, groupNode);
        } else {
            ListCell* lc = NULL;

            /* traverse all the child for the desired group */
            foreach (lc, groupNode->childGroups) {
                OperatorGroupNode* childGroupNode = (OperatorGroupNode*)lfirst(lc);
                if (childGroupNode->groupId == context->nextGroupId) {
                    groupNode = childGroupNode;
#ifdef USE_ASSERT_CHECKING
                    if (!IsMemoryIntensiveOperator(node)) {
                        Assert(groupNode->virtualGroup);
                    }
                    if ((context->status & UNDER_STREAM) != 0) {
                        Assert(groupNode->underStreamGroup);
                    }
#endif
                }
            }
            Assert(lc == NULL);
        }

        /* if we use the flag, then reset them to avoid reuse it */
        context->status &= ~(UNDER_STREAM | UNDER_MULTI_GROUP_OP);

        context->nextGroupId++;
    }

    return groupNode;
}

/*
 * PrintOperatorNode
 *	print operator node.
 *
 * Parameters:
 *	@in groupNode: input operator group
 *	@in ng_distributionList: nodegroup list
 *
 * Returns: void
 */
static void PrintOperatorNode(OperatorGroupNode* groupNode, List* ng_distributionList)
{
    if (groupNode == NULL) {
        return;
    }

    StringInfoData buf;
    initStringInfo(&buf);

    /* Group basic information */
    appendStringInfo(&buf, _("[group ID: %d, "), groupNode->groupId);
    appendStringInfo(
        &buf, _("top node ID: %d(%d"), groupNode->topNode->plan_node_id, groupNode->topNode->operatorMemKB[0]);
    if (groupNode->topNode->operatorMemKB[1] > 0) {
        appendStringInfo(&buf, _("(%d)"), groupNode->topNode->operatorMemKB[1]);
    }
    appendStringInfo(&buf, _("), "));

    if (groupNode->parentGroup != NULL) {
        appendStringInfo(&buf, _("parentGroup ID: %d, "), groupNode->parentGroup->groupId);
    } else {
        appendStringInfo(&buf, _("parentGroup is NULL, "));
    }

    /* Group memory usage information */
    appendStringInfo(&buf, _("groupMemKB: %d"), groupNode->ng_groupMemKBArray[0].groupMemKB[0]);
    if (groupNode->ng_groupMemKBArray[0].groupMemKB[1] > 0) {
        appendStringInfo(&buf, _("(%d)"), groupNode->ng_groupMemKBArray[0].groupMemKB[1]);
    }
    appendStringInfo(&buf, _(", groupOpMemKB: %d"), groupNode->ng_groupMemKBArray[0].groupOpMemKB[0]);
    if (groupNode->ng_groupMemKBArray[0].groupOpMemKB[1] > 0) {
        appendStringInfo(&buf, _("(%d)"), groupNode->ng_groupMemKBArray[0].groupOpMemKB[1]);
    }
    appendStringInfo(&buf, _(", groupBlockedOpMemKB: %d"), groupNode->ng_groupMemKBArray[0].groupBlockedOpMemKB[0]);
    if (groupNode->ng_groupMemKBArray[0].groupBlockedOpMemKB[1] > 0) {
        appendStringInfo(&buf, _("(%d)"), groupNode->ng_groupMemKBArray[0].groupBlockedOpMemKB[1]);
    }
    appendStringInfo(&buf, _(", groupConcMemKB: %d"), groupNode->ng_groupMemKBArray[0].groupConcMemKB[0]);
    if (groupNode->ng_groupMemKBArray[0].groupConcMemKB[1] > 0) {
        appendStringInfo(&buf, _("(%d)"), groupNode->ng_groupMemKBArray[0].groupConcMemKB[1]);
    }
    appendStringInfo(&buf, _(", belowStreamConcMemKB: %d"), groupNode->ng_groupMemKBArray[0].belowStreamConcMemKB[0]);
    if (groupNode->ng_groupMemKBArray[0].belowStreamConcMemKB[1] > 0) {
        appendStringInfo(&buf, _("(%d)"), groupNode->ng_groupMemKBArray[0].belowStreamConcMemKB[1]);
    }

    /* nodegroup memory usage information */
    if (ng_distributionList != NULL) {
        ListCell* cell = NULL;
        int i = 1;

        foreach (cell, ng_distributionList) {
            Oid ng_oid = ((Distribution*)lfirst(cell))->group_oid;

            appendStringInfo(
                &buf, _(", NodeGroup ID: %u(%d), "), ng_oid, groupNode->topNode->ng_operatorMemKBArray[i][0]);
            if (groupNode->topNode->ng_operatorMemKBArray[i][1] > 0) {
                appendStringInfo(&buf, _("(%d)"), groupNode->topNode->ng_operatorMemKBArray[i][1]);
            }

            appendStringInfo(&buf, _("groupMemKB: %d, "), groupNode->ng_groupMemKBArray[i].groupMemKB[0]);
            if (groupNode->ng_groupMemKBArray[i].groupMemKB[1] > 0) {
                appendStringInfo(&buf, _("(%d)"), groupNode->ng_groupMemKBArray[i].groupMemKB[1]);
            }
            appendStringInfo(&buf, _("groupOpMemKB: %d, "), groupNode->ng_groupMemKBArray[i].groupOpMemKB[0]);
            if (groupNode->ng_groupMemKBArray[i].groupOpMemKB[1] > 0) {
                appendStringInfo(&buf, _("(%d)"), groupNode->ng_groupMemKBArray[i].groupOpMemKB[1]);
            }
            appendStringInfo(
                &buf, _("groupBlockedOpMemKB: %d, "), groupNode->ng_groupMemKBArray[i].groupBlockedOpMemKB[0]);
            if (groupNode->ng_groupMemKBArray[i].groupBlockedOpMemKB[1] > 0) {
                appendStringInfo(&buf, _("(%d)"), groupNode->ng_groupMemKBArray[i].groupBlockedOpMemKB[1]);
            }
            appendStringInfo(&buf, _("groupConcMemKB: %d, "), groupNode->ng_groupMemKBArray[i].groupConcMemKB[0]);
            if (groupNode->ng_groupMemKBArray[i].groupConcMemKB[1] > 0) {
                appendStringInfo(&buf, _("(%d)"), groupNode->ng_groupMemKBArray[i].groupConcMemKB[1]);
            }
            appendStringInfo(
                &buf, _("belowStreamConcMemKB: %d"), groupNode->ng_groupMemKBArray[i].belowStreamConcMemKB[0]);
            if (groupNode->ng_groupMemKBArray[i].belowStreamConcMemKB[1] > 0) {
                appendStringInfo(&buf, _("(%d)"), groupNode->ng_groupMemKBArray[i].belowStreamConcMemKB[1]);
            }

            i++;
        }
    }

    /* Group below stream group information */
    if (groupNode->belowStreamGroups != NIL) {
        ListCell* lc = NULL;

        appendStringInfo(&buf, _(", below stream groups: ("));
        foreach (lc, groupNode->belowStreamGroups) {
            OperatorGroupNode* subNode = (OperatorGroupNode*)lfirst(lc);
            if (lc != list_head(groupNode->belowStreamGroups)) {
                appendStringInfo(&buf, _(", "));
            }
            appendStringInfo(&buf, _("%d"), subNode->groupId);
        }
        appendStringInfo(&buf, _(")"));
    }

    appendStringInfo(&buf, _("]"));

    MEMCTL_LOG("%s", buf.data);

    pfree(buf.data);
}

/*
 * PrintEarlyFreeContext
 *	print out all the operator groups in level traversal.
 *
 * Parameters:
 *	@in ctx: traverse memory context
 *
 * Returns: void
 */
static void PrintEarlyFreeContext(MethodPlanWalkerContext* ctx)
{
    List* groups = NULL;

    if (ctx == NULL) {
        MEMCTL_LOG("MethodEarlyFreeContext is NULL. Something is wrong. ");
        return;
    }

    MEMCTL_LOG("Print EarlyFreeContext: ");

    OperatorGroupNode* groupNode = ctx->groupTree;
    PrintOperatorNode(groupNode, ctx->ng_distributionList);

    ListCell *lc = NULL;
    ListCell *lcc = NULL;
    foreach (lc, groupNode->childGroups) {
        OperatorGroupNode* childGroup = (OperatorGroupNode*)lfirst(lc);
        groups = lappend(groups, childGroup);
    }

    foreach (lc, groups) {
        OperatorGroupNode* group = (OperatorGroupNode*)lfirst(lc);
        PrintOperatorNode(group, ctx->ng_distributionList);

        foreach (lcc, group->childGroups) {
            OperatorGroupNode* childGroup = (OperatorGroupNode*)lfirst(lcc);
            groups = lappend(groups, childGroup);
        }
    }
    list_free_ext(groups);
}

/* ---------------------------------------------------------- */
/*                                plan traverse routines      */
/* ---------------------------------------------------------- */

/*
 * AdjustQueryMem
 *	adjust query_mem
 *
 * Parameters:
 *	@in query_mem: original query_mem
 *	@in no_stats: if has stats
 *	@in ctx: plan tree walker context
 *
 * Returns: void
 */
void AdjustQueryMem(int* query_mem, bool no_stats, MethodPlanWalkerContext* ctx)
{
    /* For the query without stats and less than 32MB, use work_mem */
    if (query_mem[0] < MEM_THRESHOLD * 1024L) {
        if (no_stats) {
            query_mem[0] = query_mem[1] = 0;

            if (ctx != NULL) {
                for (int i = 0; i < ctx->ng_num; i++) {
                    ctx->ng_queryMemKBArray[i].queryMemKB = 0;
                }
            }
        }
    /* Judge stmt 32MB-128MB to 128MB */
    } else if (query_mem[0] < STATEMENT_MIN_MEM * 1024L * HALF_AMOUNT) {
        query_mem[0] = (int)(STATEMENT_MIN_MEM * 1024L * HALF_AMOUNT);
    /* Judge stmt with 128MB-256MB to 256MB */
    } else if (query_mem[0] < STATEMENT_MIN_MEM * 1024L) {
        query_mem[0] = STATEMENT_MIN_MEM * 1024L;
    }
}

/*
 * SetQueryMem
 *	Set query_mem
 *
 * Parameters:
 *	@in context: plan tree traverse context
 *	@in query_mem: query_mem
 *
 * Returns: void
 */
void SetQueryMem(MethodPlanWalkerContext* ctx, PlannedStmt* stmt, bool no_stats)
{
    /* ng_num = 1 means not logic cluster case */
    if (ctx->ng_num == 1) {
        stmt->query_mem[0] =
            INT_ADD(ctx->ng_queryMemKBArray[0].queryMemKB, ctx->ng_queryMemKBArray[0].streamTotalMemKB);
        if (ctx->ng_queryMemKBArray[0].minQueryMemKB > 0) {
            stmt->query_mem[1] =
                INT_ADD(ctx->ng_queryMemKBArray[0].minQueryMemKB, ctx->ng_queryMemKBArray[0].streamTotalMemKB);
        }
    } else {
        /* set max query_mem[2] */
        int max_query_mem = 0;
        int min_query_mem = 0;

        for (int i = 1; i < ctx->ng_num; i++) {
            int tmp_max_mem =
                INT_ADD(ctx->ng_queryMemKBArray[i].queryMemKB, ctx->ng_queryMemKBArray[i].streamTotalMemKB);
            int tmp_min_mem = 0;
            if (ctx->ng_queryMemKBArray[i].minQueryMemKB > 0) {
                tmp_min_mem =
                    INT_ADD(ctx->ng_queryMemKBArray[i].minQueryMemKB, ctx->ng_queryMemKBArray[i].streamTotalMemKB);
            }

            max_query_mem = Max(max_query_mem, tmp_max_mem);
            min_query_mem = Max(min_query_mem, tmp_min_mem);
        }

        stmt->query_mem[0] = max_query_mem;
        stmt->query_mem[1] = min_query_mem;
    }

    if (ADJ_PHASE(ctx)) {
        AdjustQueryMem(stmt->query_mem, no_stats, ctx);
    }

    /* We use assigned variable if query_mem is set */
    if (VALID_QUERY_MEM()) {
        stmt->query_mem[0] = Max(stmt->query_mem[0], stmt->assigned_query_mem[1]);
        stmt->query_mem[1] = Max(stmt->query_mem[1], stmt->assigned_query_mem[1]);
    }
}

/*
 * SetNgQueryMem
 *	set query_mem for each nodegroup
 *
 * Parameters:
 *	@in stmt: planned statement with the whole plan generated by the optimizer
 *	@in ctx: plan tree walker context
 *	@in no_stats: if has stats, for adjust mem
 *
 * Returns: List *
 */
void SetNgQueryMem(PlannedStmt* stmt, MethodPlanWalkerContext* ctx, bool no_stats)
{
    stmt->ng_num = list_length(ctx->ng_distributionList);

    if (ctx->ng_num > 1) {
        errno_t rc;
        ListCell* cell = NULL;
        int i = 0;

        /* assign */
        stmt->ng_queryMem = (NodeGroupQueryMem*)palloc0(stmt->ng_num * sizeof(NodeGroupQueryMem));
        foreach (cell, ctx->ng_distributionList) {
            Oid ng_oid = ((Distribution*)lfirst(cell))->group_oid;

            stmt->ng_queryMem[i].ng_oid = ng_oid;
            rc = snprintf_s(
                stmt->ng_queryMem[i].nodegroup, NAMEDATALEN, NAMEDATALEN - 1, "%s", get_pgxc_groupname(ng_oid));
            securec_check_ss(rc, "\0", "\0");

            stmt->ng_queryMem[i].query_mem[0] =
                INT_ADD(ctx->ng_queryMemKBArray[i + 1].queryMemKB, ctx->ng_queryMemKBArray[i + 1].streamTotalMemKB);
            if (ctx->ng_queryMemKBArray[i + 1].minQueryMemKB > 0) {
                stmt->ng_queryMem[i].query_mem[1] = INT_ADD(
                    ctx->ng_queryMemKBArray[i + 1].minQueryMemKB, ctx->ng_queryMemKBArray[i + 1].streamTotalMemKB);
            }

            /*
             * Always adjust nodegroup's query_mem, because we call this function only once in
             * CalculateQueryMemMain, and always want the final results.
             */
            AdjustQueryMem(stmt->ng_queryMem[i].query_mem, no_stats, NULL);

            if (VALID_QUERY_MEM()) {
                stmt->ng_queryMem[i].query_mem[0] =
                    Max(stmt->ng_queryMem[i].query_mem[0], ctx->ng_queryMemKBArray[i + 1].assigned_query_mem_1);
                stmt->ng_queryMem[i].query_mem[1] =
                    Max(stmt->ng_queryMem[i].query_mem[1], ctx->ng_queryMemKBArray[i + 1].assigned_query_mem_1);
            }

            MEMCTL_LOG("SetNgQueryMem(%d): nodegroup %u, query_mem[0] %d, query_mem[1] %d",
                ctx->phase,
                ng_oid,
                stmt->ng_queryMem[i].query_mem[0],
                stmt->ng_queryMem[i].query_mem[1]);
            i++;
        }
    } else {
        Assert(stmt->ng_queryMem == NULL);
    }
}

/*
 * CalculateQueryMemMain
 *	main entry of memory control logic
 *
 * Parameters:
 *	@in stmt: planned statement with the whole plan generated by the optimizer
 *	@in use_tenant: if used for tenant, don't spread mem
 *	@in called_by_wlm: the function is called twice, calculating query mem for
 *		the first time, and do the decreasion after check with CCN, so use the
 *		flag to differentiate it
 *
 * Returns: void
 */
void CalculateQueryMemMain(PlannedStmt* stmt, bool use_tenant, bool called_by_wlm)
{
    MethodPlanWalkerContext ctx;
    errno_t rc = 0;
    rc = memset_s(&ctx, sizeof(MethodPlanWalkerContext), 0, sizeof(MethodPlanWalkerContext));
    securec_check(rc, "\0", "\0");

    exec_init_plan_tree_base(&ctx.base, stmt);

    /*
     * Phase 1: traverse the whole plan tree, to figure out the group info, and
     * assign memory usage to the plan operator
     */
    ctx.dnExec = false;
    ctx.groupTree = NULL;
    ctx.groupNode = NULL;
    ctx.nextGroupId = 0;
    ctx.plannedStmt = stmt;
    ctx.status = 0;
    ctx.phase = ASSIGN_MEM;
    ctx.use_tenant = use_tenant;
    ctx.ng_distributionList = NIL;

    /* generate related logic cluster, not for wlm nor planB */
    if (in_logic_cluster()) {
        Assert(stmt->planTree->exec_nodes);

        if (stmt->planTree->exec_nodes->nodeList) {
            /* distribution in exec_nodes is not reliable sometime */
            Distribution* dist = ng_convert_to_distribution(stmt->planTree->exec_nodes->nodeList);
            ctx.ng_distributionList = PgxcGroupGetLogicClusterList(dist->bms_data_nodeids);
            pfree(dist);
        }
    }

    /* 0 is for the original case for not logic cluster */
    ctx.ng_num = list_length(ctx.ng_distributionList) + 1;
    ctx.ng_queryMemKBArray = (QueryMemKB*)palloc0(ctx.ng_num * sizeof(QueryMemKB));
    /* Set nodegroup's assigned_query_mem[1] */
    if (ctx.ng_num > 1) {
        SetNgAssignedQueryMem(&ctx, stmt->assigned_query_mem[1]);
    }

    bool result = MethodPlanMemCalculateWalker((Node*)stmt->planTree, &ctx);

    Assert(!result);

    bool no_stats = false;
    bool prepare_phase = false;

    SetQueryMem(&ctx, stmt, no_stats);
    prepare_phase = !called_by_wlm && DY_MEM_ADJ(stmt);

    /* skip phase 2 if run the first time in optimizer && query_mem >= 32M */
    if (prepare_phase) {
        /* Adjust query_mem because we don't do it in last SetQueryMem */
        AdjustQueryMem(stmt->query_mem, no_stats, &ctx);

        /* Adjust query mem if exceeds max mem */
        if (stmt->query_mem[0] > stmt->assigned_query_mem[1]) {
            stmt->query_mem[0] = stmt->assigned_query_mem[1];
        }
        if (stmt->query_mem[1] > stmt->assigned_query_mem[1]) {
            stmt->query_mem[1] = stmt->assigned_query_mem[1];
        }
        if (stmt->query_mem[1] == 0) {
            stmt->query_mem[1] = stmt->query_mem[0];
        }

        /* Get nodegroup's query_mem */
        SetNgQueryMem(stmt, &ctx, no_stats);
        ctx.phase = ADJUST_MEM;
    } else {
        /*
         * Phase 2: If query memory usage exceeds the threshold, then travese the
         * whole plan tree, and decrease the mem usage to threshold
         */
        no_stats = contain_single_col_stat(t_thrd.postgres_cxt.g_NoAnalyzeRelNameList);
        bool need_decrease = false;
        if (ctx.ng_num > 1) {
            for (int i = 1; i < ctx.ng_num; i++) {
                need_decrease = need_decrease || (INT_ADD(ctx.ng_queryMemKBArray[i].queryMemKB,
                                                      ctx.ng_queryMemKBArray[i].streamTotalMemKB) >
                                                     ctx.ng_queryMemKBArray[i].assigned_query_mem_1);
                ctx.ng_queryMemKBArray[i].availMemKB =
                    Max(ctx.ng_queryMemKBArray[i].assigned_query_mem_1 - ctx.ng_queryMemKBArray[i].streamTotalMemKB,
                        (int)(STATEMENT_MIN_MEM * 1024L * HALF_AMOUNT));
            }
        } else {
            need_decrease = stmt->query_mem[0] > stmt->assigned_query_mem[1];
        }

        ctx.ng_queryMemKBArray[0].availMemKB =
            Max(stmt->assigned_query_mem[1] - ctx.ng_queryMemKBArray[0].streamTotalMemKB,
                (int)(STATEMENT_MIN_MEM * 1024L * HALF_AMOUNT));

        /* some nodegroup need to decrease the mem usage */
        if (need_decrease) {
            ctx.phase = DECREASE_MEM;
            ctx.nextGroupId = 0;
            ctx.groupTreeIdx = 0;
            /* Reset query_mem[1] since maybe no min mem available after mem decrease */
            stmt->query_mem[1] = 0;
            for (int i = 0; i < ctx.ng_num; i++) {
                ctx.ng_queryMemKBArray[i].queryMemKB = 0;
                ctx.ng_queryMemKBArray[i].minQueryMemKB = 0;
                /* For nodegroup's query_mem has not been set yet here, no need to reset */
            }
            exec_init_plan_tree_base(&ctx.base, stmt);

            MEMCTL_LOG("Query decrease mem started.");
            result = MethodPlanMemCalculateWalker((Node*)stmt->planTree, &ctx);
            MEMCTL_LOG("Query decrease mem ended.");
        }
        ctx.phase = SPREAD_ADJUST_MEM;
    }

    /*
     * Phase 3: Final travese the whole plan tree to make minor adjustment
     */
    ctx.nextGroupId = 0;
    ctx.groupTreeIdx = 0;
    exec_init_plan_tree_base(&ctx.base, stmt);

    /* During pre-calculation, we don't have phase 2, also skip query mem setting */
    if (!prepare_phase) {
        /* Re-set query_mem */
        SetQueryMem(&ctx, stmt, no_stats);
        /* Get nodegroup's query_mem */
        SetNgQueryMem(stmt, &ctx, no_stats);
        /* check single logic cluster case */
        if (ctx.ng_num == 2) {
            Assert(stmt->query_mem[0] == stmt->ng_queryMem[0].query_mem[0] &&
                   stmt->query_mem[1] == stmt->ng_queryMem[0].query_mem[1]);
        }
    }
    result = MethodPlanMemCalculateWalker((Node*)stmt->planTree, &ctx);

    Assert(!result);
    ReleaseResource(&ctx);
}

/*
 * MethodPlanMemCalculateWalker
 *	main plan tree walker function, will be used in three phases of plan tree
 *	traverse, see detail from CalculateQueryMemMain()
 *
 * Parameters:
 *	@in node: traverse node in plan tree, and we only handle plan node
 *	@in context: plan tree traverse context
 *
 * Returns: void
 */
static bool MethodPlanMemCalculateWalker(Node* node, MethodPlanWalkerContext* context)
{
    if (node == NULL) {
        return false;
    }

    /* If current node is lefttree of a join, then we should keep the flag for the righttree */
    unsigned int origStatus = context->status & UNDER_MULTI_GROUP_OP;

    /* We only care plan node on datanode */
    if (IsPlanNode(node) && context->dnExec) {
        OperatorGroupNode* oldgroup = context->groupNode;

        /* Create or find the group for the current node */
        context->groupNode = CreateOrFindOperatorGroupForOperator(node, context);

        Assert(context->groupNode != NULL);

        if (context->phase == SPREAD_ADJUST_MEM && context->groupNode->groupId == 0) {
            context->ng_queryMemKBArray[0].currQueryMemKB = context->groupNode->ng_groupMemKBArray[0].groupMemKB[0];
        }
        if (context->groupNode != oldgroup) {
            /*
             * For phase 2, recursively decrease mem in current group and to child group
             * For phase 3, try to find how much mem can be auto spread for each group
             */
            if (context->phase == SPREAD_ADJUST_MEM && context->groupNode->groupId == 0) {
                SetMinimumDMem(context->groupNode);
            }
            if (context->phase == DECREASE_MEM || context->phase == SPREAD_ADJUST_MEM) {
                DecreaseMemPerGroup(context->groupNode, context->ng_queryMemKBArray, (context->phase == DECREASE_MEM));
            }
        }
    }

    bool result = plan_tree_walker(node, (MethodWalker)MethodPlanMemCalculateWalker, (void*)context);
    Assert(!result);

    /*
     * If this node is the top nodoe in a group, at this point, we should have all info about
     * its child groups. We then calculate the maximum memory usage of potential
     * concurrently active operators in all child groups.
     */
    if (context->dnExec && IsPlanNode(node)) {
        /* for phase 3, we do the minor adjustment to the memory */
        if (ADJ_PHASE(context)) {
            /* Set op mem to 0 to take back to work mem */
            if (context->ng_queryMemKBArray[0].queryMemKB == 0) {
                Plan* pnode = (Plan*)node;
                pnode->operatorMemKB[0] = 0;
                pnode->operatorMaxMem = 0;

                for (int i = 0; i < pnode->ng_num; i++) {
                    pnode->ng_operatorMemKBArray[i][0] = 0;
                }
            } else {
                AdjustMemOpConsumption(context, (Plan*)node);
            }
        /*
         * For phase 1 and phase 2, we should calculate the group memory
         * usage, but in phase 2, we are doing recalculation after memory
         * adjustment, so skip some work
         */
        } else {
            /* For phase 1, assign memory consumption calculated by optimizer */
            if (context->phase == ASSIGN_MEM) {
                if ((Plan*)node == context->groupNode->topNode) {
                    CalChildLevelForGroup(context->groupNode);
                }
                AssignMemOpConsumption(context, (Plan*)node);
            } else {
                ((Plan*)node)->operatorMemKB[1] = 0;
            }

            if ((Plan*)node == context->groupNode->topNode) {
                CalculateGroupMemory(context->groupNode, (context->phase == ASSIGN_MEM));

                /* When traverse is finished, just print out the whole picture */
                if (context->groupNode->groupId == 0) {
                    for (int i = 0; i < context->ng_num; i++) {
                        context->ng_queryMemKBArray[i].currQueryMemKB =
                            context->groupNode->ng_groupMemKBArray[i].groupMemKB[0];
                        context->ng_queryMemKBArray[i].minCurrQueryMemKB =
                            context->groupNode->ng_groupMemKBArray[i].groupMemKB[1];
                    }
                    PrintEarlyFreeContext(context);
                }
            }
        }

        /* Reset the groupNode to point to its parentGroupNode */
        if ((Plan*)node == context->groupNode->topNode) {
            context->groupNode = GetParentOperatorGroup(context->groupNode);
        }
    }

    if (origStatus & UNDER_MULTI_GROUP_OP) {
        context->status |= UNDER_MULTI_GROUP_OP;
    }

    return result;
}

/* ---------------------------------------------------------- */
/*                group and plan memory calculation routines                      */
/* ---------------------------------------------------------- */
static void InitGroupMemory(OperatorGroupNode* group)
{
    for (int i = 0; i < group->ng_num; i++) {
        group->ng_groupMemKBArray[i].groupMemKB[0] = 0;
        group->ng_groupMemKBArray[i].groupMemKB[1] = 0;
        group->ng_groupMemKBArray[i].groupBlockedOpMemKB[0] = 0;
        group->ng_groupMemKBArray[i].groupBlockedOpMemKB[1] = 0;
        group->ng_groupMemKBArray[i].groupConcMemKB[0] = 0;
        group->ng_groupMemKBArray[i].groupConcMemKB[1] = 0;
        group->ng_groupMemKBArray[i].belowStreamConcMemKB[0] = 0;
        group->ng_groupMemKBArray[i].belowStreamConcMemKB[1] = 0;
    }
}

/*
 * getTopNodeMemKB
 *	get memory usage of top node in the group
 *
 * Parameters:
 *	@in group: input operator group
 *	@in ng_index: nodegroup index
 *	@out minMem: if minMem is different from normal, return it
 *
 * Returns: memory usage of top node in the group
 */
static int GetTopNodeMemKB(OperatorGroupNode* group, int ng_index, int* minMem)
{
    if (minMem != NULL) {
        if (group->topNode->ng_operatorMemKBArray[ng_index][1] > 0) {
            *minMem = group->topNode->ng_operatorMemKBArray[ng_index][1];
        } else {
            *minMem = 0;
        }
    }

    return group->topNode->ng_operatorMemKBArray[ng_index][0];
}

/*
 * getBlockedOpMem
 *	get blocked operator memory usage of a group
 *
 * Parameters:
 *	@in node: input operator group
 *	@in ng_index: nodegroup index
 *	@in unBlockMat: if lower group is un blocked material nestloop
 *	@out minMem: if minMem is different from normal, return it
 *
 * Returns: memory usage of blocked operator in this group
 */
static int GetBlockedOpMem(OperatorGroupNode* group, int ng_index, int* minMem, bool unBlockedMat)
{
    /*
     * If the top node is not blocked, so blocked op mem should count all
     * the nodes in the group, also including the unblocked op in lower groups
     */
    if (group->virtualGroup || IsRequiredHashJoin(group->topNode, false) || unBlockedMat) {
        if (minMem != NULL) {
            if (group->ng_groupMemKBArray[ng_index].groupBlockedOpMemKB[1] > 0) {
                *minMem = group->ng_groupMemKBArray[ng_index].groupBlockedOpMemKB[1];
            } else {
                *minMem = 0;
            }
        }

        return group->ng_groupMemKBArray[ng_index].groupBlockedOpMemKB[0];
    /*
     * With blocked top node, when returning tuples, all the lower node's
     * memory is already freed, so just return memory usage of itself
     */
    } else {
        if (minMem != NULL) {
            if (group->topNode->ng_operatorMemKBArray[ng_index][1] > 0) {
                *minMem = group->topNode->ng_operatorMemKBArray[ng_index][1];
            } else {
                *minMem = 0;
            }
        }

        return group->topNode->ng_operatorMemKBArray[ng_index][0];
    }
}

/*
 * getGroupMemKB
 *	get memory usage of total group
 *
 * Parameters:
 *	@in group: input operator group
 *	@in ng_index: nodegroup index
 *	@out minMem: if minMem is different from normal, return it
 *
 * Returns: memory usage of total group
 */
static int GetGroupMemKB(OperatorGroupNode* group, int ng_index, int* minMem)
{
    if (minMem != NULL) {
        if (group->ng_groupMemKBArray[ng_index].groupMemKB[1] > 0) {
            *minMem = group->ng_groupMemKBArray[ng_index].groupMemKB[1];
        } else {
            *minMem = 0;
        }
    }

    return group->ng_groupMemKBArray[ng_index].groupMemKB[0];
}

/*
 * GetBelowStreamConcMemKB
 *	get memory usage of below stream conc mem in the group
 *
 * Parameters:
 *	@in group: input operator group
 *	@in ng_index: nodegroup index
 *	@out minMem: if minMem is different from normal, return it
 *
 * Returns: memory usage of below stream conc mem in the group
 */
static int GetBelowStreamConcMemKB(OperatorGroupNode* group, int ng_index, int* minMem)
{
    if (minMem != NULL) {
        if (group->ng_groupMemKBArray[ng_index].belowStreamConcMemKB[1] > 0) {
            *minMem = group->ng_groupMemKBArray[ng_index].belowStreamConcMemKB[1];
        } else {
            *minMem = 0;
        }
    }
    return group->ng_groupMemKBArray[ng_index].belowStreamConcMemKB[0];
}

/*
 * GroupAddMemory
 *	add memory to base, considering normal and minimal one
 *
 * Parameters:
 *	@in baseArray: input base array with normal and minimal value
 *	@in memArray: input memory array with normal and minimal value
 *
 * Returns: void
 */
static void GroupAddMemory(int* baseArray, const int* memArray, bool check)
{
    if (memArray[1] > 0) {
        if (baseArray[1] == 0) {
            baseArray[1] = baseArray[0];
        }
        baseArray[1] = INT_ADD(baseArray[1], memArray[1]);
    } else if (baseArray[1] > 0) {
        baseArray[1] = INT_ADD(baseArray[1], memArray[0]);
    }
    baseArray[0] = INT_ADD(baseArray[0], memArray[0]);
    /* If min mem equals normal one, set it to 0 */
    if (baseArray[1] == baseArray[0]) {
        baseArray[1] = 0;
    }
    Assert(baseArray[0] >= 0 && baseArray[1] >= 0);
    Assert(!check || baseArray[0] == 0 || baseArray[0] > baseArray[1]);
}

/*
 * GroupAddMemory
 *	add memory to base, considering normal and minimal one
 *
 * Parameters:
 *	@in baseArray: input base array with normal and minimal value
 *	@in memArray: input memory array with normal and minimal value
 *
 * Returns: void
 */
static void GroupSetMaxMemory(int* baseArray, const int* memArray)
{
    if (baseArray[1] > 0) {
        baseArray[1] = Max(baseArray[1], (memArray[1] > 0) ? memArray[1] : memArray[0]);
    } else if (memArray[1] != 0) {
        baseArray[1] = Max(baseArray[0], memArray[1]);
    }
    baseArray[0] = Max(baseArray[0], memArray[0]);
    Assert(baseArray[0] == 0 || baseArray[0] >= baseArray[1]);
    /* If min mem equals normal one, set it to 0 */
    if (baseArray[1] == baseArray[0]) {
        baseArray[1] = 0;
    }
}

/*
 * GetChildMaxMem
 *	get max group mem of children group, this is used to control memory
 *	usage estimation if it's too huge
 *
 * Parameters:
 *	@in group: input group
 *
 * Returns: max memory usage of child group
 */
static int GetChildMaxMem(OperatorGroupNode* group)
{
    ListCell* lc = NULL;
    int maxMem = 0;

    foreach (lc, group->childGroups) {
        OperatorGroupNode* childGroup = (OperatorGroupNode*)lfirst(lc);

        if (childGroup->ng_num == 1) {
            maxMem = Max(maxMem, childGroup->ng_groupMemKBArray[0].groupMemKB[0]);
        } else {
            for (int i = 1; i < childGroup->ng_num; i++) {
                maxMem = Max(maxMem, childGroup->ng_groupMemKBArray[i].groupMemKB[0]);
            }
        }
    }

    return maxMem;
}

/*
 * GetStreamMemConsumption
 *	get memory consumption for each stream node in one DN
 *
 * Parameters:
 *	@in node: stream node
 *	@in context: plan tree walker context
 *
 * Returns: memory usage of this stream node in one DN
 */
static void GetStreamMemConsumption(Plan* node, MethodPlanWalkerContext* context)
{
    Stream* stream = (Stream*)node;
    int consumerDop = SET_DOP(stream->smpDesc.consumerDop);
    int producerDop = SET_DOP(stream->smpDesc.producerDop);
    int consumerNum = list_length(stream->consumer_nodes->nodeList);
    int producerNum = list_length(stream->scan.plan.exec_nodes->nodeList);
    int consumer_maxMemKB = 0;
    int producer_maxMemKB = 0;

    /* Local stream only connect to local DN. */
    if (STREAM_IS_LOCAL_NODE(stream->smpDesc.distriType)) {
        consumerNum = 1;
        producerNum = 1;
    }

    /* 1. Memory allocate in StreamPrepareRequest() for StreamConsumer. */
    consumer_maxMemKB = STREAM_BUFFER_SIZE_KB * (producerNum * producerDop) * consumerDop;

    /* 2. Memory allocate in StreamSCTP::allocNetBuffer() for StreamProducer. */
    /* cache memory cost in each of the produderDop, and current estimated value is 2MB */
#define CACHE_MEM_COST_PER_DOP_KB 2048

    producer_maxMemKB = (STREAM_BUFFER_SIZE_KB * (consumerNum * consumerDop) + CACHE_MEM_COST_PER_DOP_KB) * producerDop;

    /* the same as before for not logic cluster case */
    if (context->phase == ASSIGN_MEM) {
        context->ng_queryMemKBArray[0].streamTotalMemKB += (consumer_maxMemKB + producer_maxMemKB);
        MEMCTL_LOG("GetStreamMemConsumption: Node %d, consumer_mem %dKB, producer_mem %dKB",
            node->plan_node_id,
            consumer_maxMemKB,
            producer_maxMemKB);
    } else {
        Assert(ADJ_PHASE(context));
        node->operatorMemKB[0] = consumer_maxMemKB + producer_maxMemKB;
        node->ng_operatorMemKBArray[0][0] = node->operatorMemKB[0];
    }

    /* compute consumer & producer nodegroup in case they are different */
    if (node->ng_num > 1) {
        ListCell* cell = NULL;
        int i = 1;
        /* distribution in exec_nodes is not reliable sometime */
        Distribution* consumer_dist = ng_convert_to_distribution(stream->consumer_nodes->nodeList);
        Distribution* producer_dist = ng_convert_to_distribution(stream->scan.plan.exec_nodes->nodeList);
        Bitmapset* consumer_left_nodeids = consumer_dist->bms_data_nodeids;
        Bitmapset* producer_left_nodeids = producer_dist->bms_data_nodeids;
        Assert(!bms_is_empty(consumer_left_nodeids) && !bms_is_empty(producer_left_nodeids));

        foreach (cell, context->ng_distributionList) {
            Distribution* dist = (Distribution*)lfirst(cell);
            Bitmapset* dist_nodeids = dist->bms_data_nodeids;

            /* consumer nodegroup */
            Bitmapset* inter = bms_intersect(dist_nodeids, consumer_left_nodeids);
            if (!bms_is_empty(inter)) {
                Bitmapset* tmp_nodeids = bms_difference(consumer_left_nodeids, dist_nodeids);
                bms_free(consumer_left_nodeids);
                consumer_left_nodeids = tmp_nodeids;

                if (context->phase == ASSIGN_MEM) {
                    context->ng_queryMemKBArray[i].streamTotalMemKB += consumer_maxMemKB;
                    MEMCTL_LOG(
                        "GetStreamMemConsumption: Node %d, consumer nodegroup %u", node->plan_node_id, dist->group_oid);
                } else {
                    node->ng_operatorMemKBArray[i][0] += consumer_maxMemKB;
                }
            }
            bms_free(inter);

            /* producer nodegroup */
            inter = bms_intersect(dist_nodeids, producer_left_nodeids);
            if (!bms_is_empty(inter)) {
                Bitmapset* tmp_nodeids = bms_difference(producer_left_nodeids, dist_nodeids);
                bms_free(producer_left_nodeids);
                producer_left_nodeids = tmp_nodeids;

                if (context->phase == ASSIGN_MEM) {
                    context->ng_queryMemKBArray[i].streamTotalMemKB += producer_maxMemKB;
                    MEMCTL_LOG(
                        "GetStreamMemConsumption: Node %d, producer nodegroup %u", node->plan_node_id, dist->group_oid);
                } else {
                    node->ng_operatorMemKBArray[i][0] += producer_maxMemKB;
                }
            }
            bms_free(inter);

            /* no left to process */
            if (bms_is_empty(consumer_left_nodeids) && bms_is_empty(producer_left_nodeids)) {
                break;
            }
            i++;
        }

        Assert(bms_is_empty(consumer_left_nodeids) && bms_is_empty(producer_left_nodeids));
        bms_free(consumer_left_nodeids);
        bms_free(producer_left_nodeids);
        pfree(consumer_dist);
        pfree(producer_dist);
    }
}

/*
 * CalcJoinConcMem
 *	Calculate the concurrent memory usage of a group with blocked join as top node
 *
 * Parameters:
 *	@in group: input group node
 *	@out buildMaxMem: return max memory usage during build phase
 *	@out probeMaxMem: return max memory usage during probe phase
 *	@out build0MaxMem: only used by hashjoin, return max memory usage
 *			during scanning the build tree, before hashtable building
 *	@in ng_index: nodegroup index
 *	@out minMem: In calculate mode, we may have normal and min mem calculated
 *
 * Returns: concurrent memory usage of the node
 */
static int CalcJoinConcMem(
    OperatorGroupNode* group, int* buildMaxMem, int* probeMaxMem, int* build0MaxMem, int ng_index, int* minMem)
{
    Plan* node = group->topNode;
    int buildMem[2] = {0};
    int probeMem[2] = {0};
    int build0Mem[2] = {0};
    int concMem[2] = {0};

    Assert(IsBlockedJoinNode(node));

    OperatorGroupNode* leftTreeGroup = (OperatorGroupNode*)linitial(group->childGroups);
    OperatorGroupNode* rightTreeGroup = (OperatorGroupNode*)lsecond(group->childGroups);

    if (IsRequiredHashJoin(node, true)) {
        /*
         * when executing right tree, the top memory includes the total mem of right tree,
         * and below stream part of lefttree
         */
        if (minMem != NULL) {
            concMem[0] = GetGroupMemKB(rightTreeGroup, ng_index, &concMem[1]);
            GroupAddMemory(build0Mem, concMem);
            concMem[0] = GetBelowStreamConcMemKB(leftTreeGroup, ng_index, &concMem[1]);
            GroupAddMemory(build0Mem, concMem);
        } else {
            *build0MaxMem =
                INT_ADD(GetGroupMemKB(rightTreeGroup, ng_index), GetBelowStreamConcMemKB(leftTreeGroup, ng_index));
        }

        /*
         * when building hash table, the top memory includes the blocked op of right tree,
         * hash join op and below stream part of lefttree
         */
        if (minMem != NULL) {
            concMem[0] = GetTopNodeMemKB(group, ng_index, &concMem[1]);
            GroupAddMemory(buildMem, concMem);
            concMem[0] = GetBelowStreamConcMemKB(leftTreeGroup, ng_index, &concMem[1]);
            GroupAddMemory(buildMem, concMem);
            concMem[0] = GetBlockedOpMem(rightTreeGroup, ng_index, &concMem[1]);
            GroupAddMemory(buildMem, concMem);
        } else {
            *buildMaxMem =
                INT_ADD(INT_ADD(GetTopNodeMemKB(group, ng_index), GetBelowStreamConcMemKB(leftTreeGroup, ng_index)),
                    GetBlockedOpMem(rightTreeGroup, ng_index));
        }

        /*
         * when probing lefttree, the top memory includes whole lefttree and hashjoin op
         */
        if (minMem != NULL) {
            concMem[0] = GetTopNodeMemKB(group, ng_index, &concMem[1]);
            GroupAddMemory(probeMem, concMem);
            concMem[0] = GetGroupMemKB(leftTreeGroup, ng_index, &concMem[1]);
            GroupAddMemory(probeMem, concMem);
        } else {
            *probeMaxMem = INT_ADD(GetTopNodeMemKB(group, ng_index), GetGroupMemKB(leftTreeGroup, ng_index));
        }

        if (minMem != NULL) {
            GroupSetMaxMemory(concMem, buildMem);
            GroupSetMaxMemory(concMem, probeMem);
            GroupSetMaxMemory(concMem, build0Mem);
            *minMem = concMem[1];
            return concMem[0];
        } else {
            return Max(Max(*buildMaxMem, *probeMaxMem), *build0MaxMem);
        }
    } else if (IsA(node, MergeJoin) || IsA(node, VecMergeJoin)) {
        /*
         * when scanning the lefttree, the top memory includes the top mem of left tree,
         * and below stream part of right tree
         */
        if (minMem != NULL) {
            concMem[0] = GetGroupMemKB(leftTreeGroup, ng_index, &concMem[1]);
            GroupAddMemory(buildMem, concMem);
            concMem[0] = GetBelowStreamConcMemKB(rightTreeGroup, ng_index, &concMem[1]);
            GroupAddMemory(buildMem, concMem);
        } else {
            *buildMaxMem =
                INT_ADD(GetGroupMemKB(leftTreeGroup, ng_index), GetBelowStreamConcMemKB(rightTreeGroup, ng_index));
        }

        /*
         * When scan the right tree, the top memory includes the blocked op of lefttree and whole right tree
         */
        if (minMem != NULL) {
            concMem[0] = GetTopNodeMemKB(group, ng_index, &concMem[1]);
            GroupAddMemory(probeMem, concMem);
            concMem[0] = GetGroupMemKB(rightTreeGroup, ng_index, &concMem[1]);
            GroupAddMemory(probeMem, concMem);
            concMem[0] = GetBlockedOpMem(leftTreeGroup, ng_index, &concMem[1]);
            GroupAddMemory(probeMem, concMem);
        } else {
            *probeMaxMem = INT_ADD(INT_ADD(GetTopNodeMemKB(group, ng_index), GetGroupMemKB(rightTreeGroup, ng_index)),
                GetBlockedOpMem(leftTreeGroup, ng_index));
        }

        if (minMem != NULL) {
            GroupSetMaxMemory(concMem, buildMem);
            GroupSetMaxMemory(concMem, probeMem);
            *minMem = concMem[1];
            return concMem[0];
        } else {
            return Max(*buildMaxMem, *probeMaxMem);
        }
    } else if (IsRequiredNestLoop(node, true)) {
        /*
         * when executing right tree, the top memory includes the total mem of right tree,
         * and below stream part of lefttree
         */
        if (minMem != NULL) {
            concMem[0] = GetGroupMemKB(rightTreeGroup, ng_index, &concMem[1]);
            GroupAddMemory(buildMem, concMem);
            concMem[0] = GetBelowStreamConcMemKB(leftTreeGroup, ng_index, &concMem[1]);
            GroupAddMemory(buildMem, concMem);
        } else {
            *buildMaxMem =
                INT_ADD(GetGroupMemKB(rightTreeGroup, ng_index), GetBelowStreamConcMemKB(leftTreeGroup, ng_index));
        }

        /*
         * when probing lefttree, the top memory includes whole lefttree and materialize
         */
        if (minMem != NULL) {
            concMem[0] = GetTopNodeMemKB(group, ng_index, &concMem[1]);
            GroupAddMemory(probeMem, concMem);
            concMem[0] = GetGroupMemKB(leftTreeGroup, ng_index, &concMem[1]);
            GroupAddMemory(probeMem, concMem);
            concMem[0] = GetBlockedOpMem(rightTreeGroup, ng_index, &concMem[1]);
            GroupAddMemory(probeMem, concMem);
        } else {
            *probeMaxMem = INT_ADD(INT_ADD(GetTopNodeMemKB(group, ng_index), GetGroupMemKB(leftTreeGroup, ng_index)),
                GetBlockedOpMem(rightTreeGroup, ng_index));
        }

        if (minMem != NULL) {
            GroupSetMaxMemory(concMem, buildMem);
            GroupSetMaxMemory(concMem, probeMem);
            *minMem = concMem[1];
            return concMem[0];
        } else {
            return Max(*probeMaxMem, *buildMaxMem);
        }
    }

    return 0;
}

/*
 * CalculateGroupMemory
 *	Group memory usage calculation main entry. For each group, we need to
 *	calculate blocked op mem usage, concurrent mem usage, below stream
 *	memory usage and max them as group memory usage
 *
 * Parameters:
 *	@in group: input group node
 *	@in firstTime: if it's the first traverse. If so, we should make below group lists.
 *
 * Returns: void
 */
static void CalculateGroupMemory(OperatorGroupNode* group, bool firstTime)
{
    Plan* node = group->topNode;

    InitGroupMemory(group);

    /* First assign memory usage of plan nodes belonging to this group as start */
    for (int i = 0; i < group->ng_num; i++) {
        GroupAddMemory(group->ng_groupMemKBArray[i].groupBlockedOpMemKB, group->ng_groupMemKBArray[i].groupOpMemKB);
    }

    /* For blocked join, first calculate memory usage of join branches */
    if (IsBlockedJoinNode(node)) {
        CalculateBlockedJoinGroupMemory(group, firstTime);
    }

    /*
     * Then calculate memory usage according to sub group, considerring if child
     * groups can coexist, and skip the first two branch of blocked join
     */
    CalculateGroupMemFromChild(group, !IsAppendNode(node), IsBlockedJoinNode(node), firstTime);
}

/*
 * CalculateBlockedJoinGroupMemory
 *	Calculate memory usage of group with blocked join node as top node.
 *	We only concern lefttree and right tree of join node, and left other sublink
 *	to CalculateGroupMemFromLower().
 *
 * Parameters:
 *	@in group: input group node
 *	@in firstTime: if it's the first traverse. If so, we should make below group lists.
 *
 * Returns: void
 */
static void CalculateBlockedJoinGroupMemory(OperatorGroupNode* group, bool firstTime)
{
    Plan* node = group->topNode;
    Assert(IsBlockedJoinNode(node));

    OperatorGroupNode* leftTreeGroup = (OperatorGroupNode*)linitial(group->childGroups);
    OperatorGroupNode* rightTreeGroup = (OperatorGroupNode*)lsecond(group->childGroups);

    /* Only for the first traverse, below stream group list is made. Note: use list_copy to prevent pointer share by
     * different groups */
    if (firstTime) {
        group->belowStreamGroups =
            list_concat(list_copy(leftTreeGroup->belowStreamGroups), rightTreeGroup->belowStreamGroups);
    }

    for (int i = 0; i < group->ng_num; i++) {
        int build0MaxMem = 0;
        int buildMaxMem = 0;
        int probeMaxMem = 0;
        int blockedOpMem[2] = {0};

        /*
         * Calculate blocked op mem for three join methods separately.
         *	1. For blocked hashjoin, only lefttree can coexist with above tree.
         *	2. For mergejoin, blocked lefttree and righttree can coexist with above tree.
         *	3. For blocked nestloop, the same case as mergejoin.
         */
        if (IsRequiredHashJoin(node, true)) {
            blockedOpMem[0] = GetBlockedOpMem(leftTreeGroup, i, firstTime ? &blockedOpMem[1] : NULL);
            GroupAddMemory(group->ng_groupMemKBArray[i].groupBlockedOpMemKB, blockedOpMem);
        } else if (IsA(node, MergeJoin) || IsA(node, VecMergeJoin)) {
            blockedOpMem[0] = GetBlockedOpMem(leftTreeGroup, i, firstTime ? &blockedOpMem[1] : NULL);
            GroupAddMemory(group->ng_groupMemKBArray[i].groupBlockedOpMemKB, blockedOpMem);

            /* If there's materialize on top of sort, it's not blocked */
            blockedOpMem[0] =
                GetBlockedOpMem(rightTreeGroup, i, firstTime ? &blockedOpMem[1] : NULL, IsJoinRightTreeUnBlocked(node));
            GroupAddMemory(group->ng_groupMemKBArray[i].groupBlockedOpMemKB, blockedOpMem);
        } else {
            Assert(IsRequiredNestLoop(node, true));
            blockedOpMem[0] = GetBlockedOpMem(leftTreeGroup, i, firstTime ? &blockedOpMem[1] : NULL);
            GroupAddMemory(group->ng_groupMemKBArray[i].groupBlockedOpMemKB, blockedOpMem);

            /* If there's materialize on top of sort, it's not blocked */
            blockedOpMem[0] = GetBlockedOpMem(rightTreeGroup, i, firstTime ? &blockedOpMem[1] : NULL);
            GroupAddMemory(group->ng_groupMemKBArray[i].groupBlockedOpMemKB, blockedOpMem);
        }

        /* Calculate group concurrent memory consumption */
        blockedOpMem[0] =
            CalcJoinConcMem(group, &buildMaxMem, &probeMaxMem, &build0MaxMem, i, firstTime ? &blockedOpMem[1] : NULL);
        GroupAddMemory(group->ng_groupMemKBArray[i].groupConcMemKB, blockedOpMem);

        /* Join below stream concurrent memory comes from both branches */
        blockedOpMem[0] = GetBelowStreamConcMemKB(leftTreeGroup, i, firstTime ? &blockedOpMem[1] : NULL);
        GroupAddMemory(group->ng_groupMemKBArray[i].belowStreamConcMemKB, blockedOpMem);
        blockedOpMem[0] = GetBelowStreamConcMemKB(rightTreeGroup, i, firstTime ? &blockedOpMem[1] : NULL);
        GroupAddMemory(group->ng_groupMemKBArray[i].belowStreamConcMemKB, blockedOpMem);

        /* Take the max as group total mem */
        GroupSetMaxMemory(group->ng_groupMemKBArray[i].groupMemKB, group->ng_groupMemKBArray[i].groupConcMemKB);
        GroupSetMaxMemory(group->ng_groupMemKBArray[i].groupMemKB, group->ng_groupMemKBArray[i].belowStreamConcMemKB);
        GroupSetMaxMemory(group->ng_groupMemKBArray[i].groupMemKB, group->ng_groupMemKBArray[i].groupBlockedOpMemKB);
    }
}

/*
 * CalculateGroupMemFromLower
 *	Calculate memory usage of group from the child groups. Since below stream
 *	groups will coexist, and the memory above stream groups can be exclusive,
 *	we should calculate two seperately.
 *
 * Parameters:
 *	@in group: input group node
 *	@in childGroupCoexist: if child groups coexist, true for non-append group
 *	@in isBlockedJoin: if the top node of group is blocked join, if so we can skip
 *				join branch since we have calculate them
 *	@in firstTime: if it's the first traverse. If so, we should make below group lists.
 *
 * Returns: void
 */
static void CalculateGroupMemFromChild(
    OperatorGroupNode* group, bool childGroupCoexist, bool isBlockedJoin, bool firstTime)
{
    ListCell *lc = NULL;
    ListCell *lc2 = NULL;

    for (int j = 0; j < group->ng_num; j++) {
        int belowStreamTotalMem[2] = {0};
        int aboveStreamMemInc[2] = {0};
        int opMem[2] = {0};
        int childMaxGroupOpMemKB[2] = {0};
        int i = 0;

        foreach (lc, group->childGroups) {
            OperatorGroupNode* childGroup = (OperatorGroupNode*)lfirst(lc);
            int belowStreamMem[2] = {0};

            /* For blocked join, skip the first two branches */
            if (isBlockedJoin && i <= RIGHTTREE_POS) {
                continue;
            }

            /* Calculate the below stream group total memory usage */
            if (childGroup->underStreamGroup) {
                opMem[0] = GetGroupMemKB(childGroup, j, firstTime ? &opMem[1] : NULL);
                GroupAddMemory(belowStreamMem, opMem);
            } else {
                foreach (lc2, childGroup->belowStreamGroups) {
                    OperatorGroupNode* belowStreamGroup = (OperatorGroupNode*)lfirst(lc2);
                    opMem[0] = GetGroupMemKB(belowStreamGroup, j, firstTime ? &opMem[1] : NULL);
                    GroupAddMemory(belowStreamMem, opMem);
                }
            }

            GroupAddMemory(belowStreamTotalMem, belowStreamMem);

            /* Calculate above stream memory increment and blocked op mem for coexist and exclusive case */
            opMem[0] = GetGroupMemKB(childGroup, j, firstTime ? &opMem[1] : NULL);
            opMem[0] -= belowStreamMem[0];
            if (opMem[1] > 0) {
                opMem[1] -= (belowStreamMem[1] > 0) ? belowStreamMem[1] : belowStreamMem[0];
            } else if (belowStreamMem[1] > 0) {
                opMem[1] = opMem[0] - belowStreamMem[1];
            }

            if (childGroupCoexist) {
                GroupAddMemory(aboveStreamMemInc, opMem, false);

                /* For non-blocked nestloop, we should count all the blocked op of right tree */
                if (i == RIGHTTREE_POS && IsJoinRightTreeUnBlocked(group->topNode)) {
                    opMem[0] = GetBlockedOpMem(childGroup, j, firstTime ? &opMem[1] : NULL, true);
                    GroupAddMemory(group->ng_groupMemKBArray[j].groupBlockedOpMemKB, opMem);
                } else {
                    opMem[0] = GetBlockedOpMem(childGroup, j, firstTime ? &opMem[1] : NULL);
                    GroupAddMemory(group->ng_groupMemKBArray[j].groupBlockedOpMemKB, opMem);
                }
            } else {
                opMem[0] = GetBlockedOpMem(childGroup, j, firstTime ? &opMem[1] : NULL);
                GroupSetMaxMemory(aboveStreamMemInc, opMem);
                GroupSetMaxMemory(childMaxGroupOpMemKB, opMem);
            }

            /* Record below stream mem and add the below stream groups to list for the first time */
            GroupAddMemory(group->ng_groupMemKBArray[j].belowStreamConcMemKB, belowStreamMem);

            /* only for the first j */
            if (j == 0 && firstTime) {
                if (childGroup->underStreamGroup) {
                    group->belowStreamGroups = lappend(group->belowStreamGroups, childGroup);
                 }else {
                    group->belowStreamGroups =
                        list_concat(group->belowStreamGroups, list_copy(childGroup->belowStreamGroups));
                }
            }

            if (firstTime && !childGroup->underStreamGroup) {
                opMem[0] = GetBelowStreamConcMemKB(childGroup, j, &opMem[1]);
                Assert(opMem[0] == belowStreamMem[0] && opMem[1] == belowStreamMem[1]);
            }

            i++;
        }

        if (!childGroupCoexist) {
            GroupAddMemory(group->ng_groupMemKBArray[j].groupBlockedOpMemKB, childMaxGroupOpMemKB);
        }

        /* Finalize the total concurrent mem as the sum of below stream mem and above stream mem increment */
        GroupAddMemory(group->ng_groupMemKBArray[j].groupConcMemKB, belowStreamTotalMem);
        GroupAddMemory(group->ng_groupMemKBArray[j].groupConcMemKB, aboveStreamMemInc);
        GroupSetMaxMemory(group->ng_groupMemKBArray[j].groupMemKB, group->ng_groupMemKBArray[j].groupConcMemKB);
        GroupSetMaxMemory(group->ng_groupMemKBArray[j].groupMemKB, group->ng_groupMemKBArray[j].belowStreamConcMemKB);
        GroupSetMaxMemory(group->ng_groupMemKBArray[j].groupMemKB, group->ng_groupMemKBArray[j].groupBlockedOpMemKB);
    }
}

/* ---------------------------------------------------------- */
/*                        memory usage decrease routines                              */
/* ---------------------------------------------------------- */
/*
 * SetMinimumDMem
 *	In memory spread case, we use negative value for spread mem num,
 *	for the convenience to do max operation, we set the initial value to int minimum
 *
 * Parameters:
 *	@in group: input group node
 *
 * Returns: void
 */
static void SetMinimumDMem(OperatorGroupNode* group)
{
    ListCell* lc = NULL;

    for (int i = 0; i < group->ng_num; i++) {
        if (group->ng_groupMemKBArray[i].decreaseMemKB == 0) {
            group->ng_groupMemKBArray[i].decreaseMemKB = -INT_MAX;
        }
        if (group->ng_groupMemKBArray[i].topNodeDMemKB == 0) {
            group->ng_groupMemKBArray[i].topNodeDMemKB = -INT_MAX;
        }
        if (group->ng_groupMemKBArray[i].groupOpDMemKB == 0) {
            group->ng_groupMemKBArray[i].groupOpDMemKB = -INT_MAX;
        }
    }

    foreach (lc, group->childGroups) {
        OperatorGroupNode* childGroup = (OperatorGroupNode*)lfirst(lc);
        SetMinimumDMem(childGroup);
    }
}

/*
 * DecreaseMemPerGroup
 *	Memory decrease logic main entry. Should done to every group recursively,
 *	concerning all the three quota.
 *
 * Parameters:
 *	@in group: input group node
 *	@in ng_queryMemKBArray: availMem, the target mem should decrease to
 *	@in decreaseMem: true for phase 2, and availMem is smaller than groupMem,
 *					or false for phase 3, and availMem is bigger than groupMem
 *
 * Returns: void
 */
static void DecreaseMemPerGroup(OperatorGroupNode* group, QueryMemKB* ng_queryMemKBArray, bool decreaseMem)
{
    /* decrease mem for each nodegroup */
    for (int i = 0; i < group->ng_num; i++) {
        group->ng_groupMemKBArray[i].decreaseMemKB = Max(group->ng_groupMemKBArray[i].decreaseMemKB,
            group->ng_groupMemKBArray[i].groupMemKB[0] - ng_queryMemKBArray[i].availMemKB);

        MEMCTL_LOG("[DecreaseMemPerGroup] Group %d expects to decrease mem: %dKB(%d)",
            group->groupId,
            group->ng_groupMemKBArray[i].decreaseMemKB,
            i);

        /* Do the memory reduction on three quota seperately */
        AssignDecreaseMemToChild(group, i, decreaseMem);

        DecreaseOpMem(group, i, decreaseMem);

        AssignConcDecreaseMemToChild(group, i, decreaseMem);

        /* Do the memory deduction on top node of group itself according to decrease amount */
        if (group->ng_groupMemKBArray[i].topNodeDMemKB > 0 && decreaseMem) {
            int opMemOrig = group->topNode->ng_operatorMemKBArray[i][0];

            /* For serial query, we shouldn't decrease a lot if plan is memory sensitive */
            if (t_thrd.wlm_cxt.thread_climgr->active_statements <= 1 && IsMemorySensitiveNode(group->topNode)) {
                group->topNode->ng_operatorMemKBArray[i][0] -= group->ng_groupMemKBArray[i].topNodeDMemKB;
            } else {
                group->topNode->ng_operatorMemKBArray[i][0] =
                    Min((int)(group->topNode->ng_operatorMemKBArray[i][0] * DECREASED_MIN_CMP_GAP),
                        group->topNode->ng_operatorMemKBArray[i][0] - group->ng_groupMemKBArray[i].topNodeDMemKB);
            }

            /* Abnormal case: no enough memory to decrease, so log it */
            if (group->topNode->ng_operatorMemKBArray[i][0] < 0) {
                int missMem = -group->topNode->ng_operatorMemKBArray[i][0];
                if (missMem >= MIN_OP_MEM) {
                    MEMCTL_LOG("[Node Mem Decrease Miss]: Node %d: %dKB(%d) miss.",
                        group->topNode->plan_node_id,
                        -group->topNode->ng_operatorMemKBArray[i][0],
                        i);
                }
                group->topNode->ng_operatorMemKBArray[i][0] = MIN_OP_MEM;
            }

            MEMCTL_LOG("Node %d decrease mem to %dKB(%d)",
                group->topNode->plan_node_id,
                group->topNode->ng_operatorMemKBArray[i][0],
                i);

            group->ng_groupMemKBArray[i].groupOpMemKB[0] -= opMemOrig - group->topNode->ng_operatorMemKBArray[i][0];
            group->ng_groupMemKBArray[i].groupOpMemKB[1] -=
                Max(0, group->topNode->ng_operatorMemKBArray[i][1] - group->topNode->ng_operatorMemKBArray[i][0]);
            if (group->ng_groupMemKBArray[i].groupOpMemKB[1] >= group->ng_groupMemKBArray[i].groupOpMemKB[0]) {
                group->ng_groupMemKBArray[i].groupOpMemKB[1] = 0;
            }
        }

        if (decreaseMem) {
            group->ng_groupMemKBArray[i].topNodeDMemKB = 0;
            group->ng_groupMemKBArray[i].decreaseMemKB = 0;
            group->ng_groupMemKBArray[i].groupOpDMemKB = 0;
        }
    }

    /* set back to operatorMemKB after loop */
    group->topNode->operatorMemKB[0] = group->topNode->ng_operatorMemKBArray[0][0];
    group->topNode->operatorMemKB[1] = group->topNode->ng_operatorMemKBArray[0][1];
    if (group->ng_num > 1) {
        int maxmem = INT_MAX;
        for (int i = 1; i < group->ng_num; i++) {
            /* operator is process on this nodegroup */
            if (group->topNode->ng_operatorMemKBArray[i][0] > 0) {
                maxmem = Min(maxmem, group->topNode->ng_operatorMemKBArray[i][0]);
            }
        }

        group->topNode->operatorMemKB[0] = maxmem;
    }
}

/*
 * AssignDMemToTopNode
 *	Atomic routine for assign decrease mem to top node of a group.
 *
 * Parameters:
 *	@in group: input group node
 *	@in totalOpMem: total op mem of several pending mem-decrese
 *		ops, and it's used to decrease mem in ops in proportion.
 *		when it's 0, there's no proportion calculation needed
 *	@in dMem: how many memory should be decreased among several ops
 *	@in funcname: func name calling this function, used for debug log
 *	@in ng_index: nodegroup index
 *
 * Returns: void
 */
static void AssignDMemToTopNode(OperatorGroupNode* group, int totalOpMem, int dMem, const char* funcname, int ng_index)
{
    int actualDMem = (totalOpMem == 0) ? dMem : MEM_DECREASE(GetTopNodeMemKB(group, ng_index), dMem);

    group->ng_groupMemKBArray[ng_index].topNodeDMemKB =
        Max(group->ng_groupMemKBArray[ng_index].topNodeDMemKB, actualDMem);
    MEMCTL_LOG("[%s] Node %d expects to decrease mem %dKB(%d)",
        funcname,
        group->topNode->plan_node_id,
        group->ng_groupMemKBArray[ng_index].topNodeDMemKB,
        ng_index);
}

/*
 * AssignDMemToBlockedOp
 *	Atomic routine for assign decrease mem to blocked op of a group. There
 *	are two cases: when the op node is blocked, only decrease mem of top
 *	node, or decrease mem of the blocked ops mem of the group
 *
 * Parameters:
 *	@in group: input group node
 *	@in totalOpMem: total op mem of several pending mem-decrese
 *		ops, and it's used to decrease mem in ops in proportion.
 *		when it's 0, there's no proportion calculation needed
 *	@in dMem: how many memory should be decreased among several
 *		groups and ops
 *	@in funcname: func name calling this function, used for debug log
 *	@in ng_index: nodegroup index
 *	@unBlockedMat: if the node is unblocked materialize. This is an
 *		exception, since materialize always treats as blocked.
 *
 * Returns: void
 */
static void AssignDMemToBlockedOp(
    OperatorGroupNode* group, int totalOpMem, int dMem, const char* funcname, int ng_index, bool unBlockedMat)
{
    if (IsTopNodeBlockedOp(group) && !unBlockedMat) {
        int actualDMem =
            (totalOpMem == 0) ? dMem : MEM_DECREASE(group->topNode->ng_operatorMemKBArray[ng_index][0], dMem);
        group->ng_groupMemKBArray[ng_index].topNodeDMemKB =
            Max(group->ng_groupMemKBArray[ng_index].topNodeDMemKB, actualDMem);
        MEMCTL_LOG("[%s] Node %d expects to decrease mem %dKB(%d)",
            funcname,
            group->topNode->plan_node_id,
            group->ng_groupMemKBArray[ng_index].topNodeDMemKB,
            ng_index);
    } else {
        int actualDMem =
            (totalOpMem == 0) ? dMem : MEM_DECREASE(GetBlockedOpMem(group, ng_index, NULL, unBlockedMat), dMem);
        group->ng_groupMemKBArray[ng_index].groupOpDMemKB =
            Max(group->ng_groupMemKBArray[ng_index].groupOpDMemKB, actualDMem);
        MEMCTL_LOG("[%s] Group %d expects to decrease op mem: %dKB(%d)",
            funcname,
            group->groupId,
            group->ng_groupMemKBArray[ng_index].groupOpDMemKB,
            ng_index);
    }
}

/*
 * AssignDMemToGroup
 *	Atomic routine for assign decrease mem to the group.
 *
 * Parameters:
 *	@in group: input group node
 *	@in totalOpMem: total op mem of several pending mem-decrese
 *		ops, and it's used to decrease mem in ops in proportion.
 *		when it's 0, there's no proportion calculation needed
 *	@in dMem: how many memory should be decreased among several groups
 *	@in funcname: func name calling this function, used for debug log
 *	@in ng_index: nodegroup index
 *
 * Returns: void
 */
static void AssignDMemToGroup(OperatorGroupNode* group, int totalOpMem, int dMem, const char* funcname, int ng_index)
{
    int actualDMem = (totalOpMem == 0) ? dMem : MEM_DECREASE(group->ng_groupMemKBArray[ng_index].groupMemKB[0], dMem);
    group->ng_groupMemKBArray[ng_index].decreaseMemKB =
        Max(group->ng_groupMemKBArray[ng_index].decreaseMemKB, actualDMem);
    MEMCTL_LOG("[%s] Group %d expects to decrease group mem: %dKB(%d)",
        funcname,
        group->groupId,
        group->ng_groupMemKBArray[ng_index].decreaseMemKB,
        ng_index);
}

/*
 * AssignDecreaseMemToChild
 *	Memory decrease logic on below stream mem quota. We just assign the desired memory
 *	decrease amount to all the children groups in proportion of group mem of children groups
 *
 * Parameters:
 *	@in group: input group node
 *	@in ng_index: nodegroup index
 *	@in decreaseMem: true for phase 2, and availMem is smaller than groupMem,
 *					or false for phase 3, and availMem is bigger than groupMem
 *
 * Returns: void
 */
static void AssignDecreaseMemToChild(OperatorGroupNode* group, int ng_index, bool decreaseMem)
{
    ListCell* lc = NULL;
    /* First calculate how many mem we want to decrease */
    int belowStreamGrpDMem =
        group->ng_groupMemKBArray[ng_index].belowStreamConcMemKB[0] -
        (group->ng_groupMemKBArray[ng_index].groupMemKB[0] - group->ng_groupMemKBArray[ng_index].decreaseMemKB);

    /* If no need to decrease, return */
    if ((decreaseMem && belowStreamGrpDMem <= 0) || (!decreaseMem && belowStreamGrpDMem >= 0)) {
        return;
    }

    MEMCTL_LOG("[AssignDecreaseMemToChild] Group %d expects to decrease belowStream mem: %dKB(%d)",
        group->groupId,
        belowStreamGrpDMem,
        ng_index);

    int i = 0;
    int groupNum = list_length(group->belowStreamGroups);

    /* If there's no children groups and we want to do memory decrease, something unexpected happened */
    if (groupNum == 0) {
        if (belowStreamGrpDMem >= MIN_OP_MEM || belowStreamGrpDMem <= -MIN_OP_MEM) {
            MEMCTL_LOG("[BelowStream Mem Decrease Miss(Group)]: group %d: %dKB(%d) miss.",
                group->groupId,
                belowStreamGrpDMem,
                ng_index);
        }
        return;
    }

    /*
     * Put all the children groups in an array, sort them with group mem,
     * get rid of small groups, and do the final assignment
     */
    OpMemItem* itemArray = (OpMemItem*)palloc0(sizeof(OpMemItem) * groupNum);

    foreach (lc, group->belowStreamGroups) {
        OperatorGroupNode* childGroup = (OperatorGroupNode*)lfirst(lc);
        itemArray[i].id = i;
        itemArray[i].opMem = childGroup->ng_groupMemKBArray[ng_index].groupMemKB[0];
        i++;
    }

    int totalOpMem = 0;
    groupNum = ChooseDMemItems(itemArray, groupNum, &totalOpMem, belowStreamGrpDMem, decreaseMem);

    for (i = 0; i < groupNum; i++) {
        OperatorGroupNode* childGroup = (OperatorGroupNode*)list_nth(group->belowStreamGroups, itemArray[i].id);
        AssignDMemToGroup(childGroup, totalOpMem, belowStreamGrpDMem, __FUNCTION__, ng_index);
    }
    pfree(itemArray);
}

/*
 * DecreaseOpMemToNode
 *	A subroutine that does the memory deduction directly on plan nodes. This only
 *	happens when we want to decrease mem on blocked op of current group and
 *	blocked child group. The nodes includes top node of current group, and blocked
 *	node in children groups. Do the memory deduction in sequence of less regress
 *	cost.
 *
 * Parameters:
 *	@in group: input group node
 *	@in childTopOps: a list of all blocked op from children groups
 *	@in opDMem: the memory amount expected to decrease
 *	@in ng_index: nodegroup index
 *	@in decreaseMem: true for phase 2, and availMem is smaller than groupMem,
 *					or false for phase 3, and availMem is bigger than groupMem
 *
 * Returns: void
 */
static void DecreaseOpMemToNode(OperatorGroupNode* group, List* childTopOps, int opDMem, int ng_index, bool decreaseMem)
{
    /* If no need to decrease, return */
    if ((decreaseMem && opDMem <= 0) || (!decreaseMem && opDMem >= 0)) {
        return;
    }

    /*
     * Now we should decrease op mem of current group, put blocked op
     * of children groups first, and finally add node of current group
     */
    int opNum = list_length(childTopOps) + 1;
    OpMemItem* itemArray = (OpMemItem*)palloc0(sizeof(OpMemItem) * opNum);
    int i = 0;
    ListCell* lc = NULL;
    CmpMethod m = decreaseMem ? COST_CMP : OPMEM_CMP;
    int totalOpMem = 0;
    int totalDMem = opDMem;

    foreach (lc, childTopOps) {
        Plan* plan = (Plan*)lfirst(lc);
        itemArray[i].id = i;
        itemArray[i].plan = plan;
        if (decreaseMem) {
            CalcDecreaseRegressCost(&itemArray[i], opDMem);
        } else {
            itemArray[i].opMem = plan->operatorMemKB[0];
        }
        i++;
    }
    itemArray[i].id = i;
    itemArray[i].plan = group->topNode;
    if (decreaseMem) {
        CalcDecreaseRegressCost(&itemArray[i], opDMem);
    } else {
        itemArray[i].opMem = group->topNode->operatorMemKB[0];
    }

    /*
     * Sort the nodes according to expected regress cost after mem deduction,
     * and do the mem deduction in sequence of this
     */
    qsort_arg(itemArray, opNum, sizeof(OpMemItem), OpMemItemCmp, &m);

    for (i = 0; i < opNum; i++) {
        totalOpMem += itemArray[i].opMem;
        if (decreaseMem && totalOpMem >= opDMem) {
            break;
        }
    }

    for (i = 0; i < opNum; i++) {
        OperatorGroupNode* targetGroup = group;

        /* Should decrease mem on blocked nodes of chilren group, just record it now */
        if (itemArray[i].id < list_length(childTopOps)) {
            foreach (lc, group->childGroups) {
                OperatorGroupNode* childGroup = (OperatorGroupNode*)lfirst(lc);
                if (childGroup->topNode == itemArray[i].plan) {
                    targetGroup = childGroup;
                    break;
                }
            }
            Assert(lc != NULL);
        /* Should decrease mem on the top node of current group, record it now */
        } else {
            Assert(itemArray[i].plan == group->topNode);
        }

        if (decreaseMem) {
            int dMem = Min(opDMem, itemArray[i].opMem);
            if (totalOpMem < totalDMem && totalOpMem > 0) {
                dMem += MEM_DECREASE(itemArray[i].opMem, totalDMem - totalOpMem);
            }
            opDMem -= dMem;
            AssignDMemToTopNode(targetGroup, 0, dMem, __FUNCTION__, ng_index);

            if (opDMem <= 0) {
                break;
            }
        } else {
            AssignDMemToTopNode(targetGroup, totalOpMem, opDMem, __FUNCTION__, ng_index);
        }
    }

    pfree(itemArray);
}

/*
 * DecreaseOpMemToGroup
 *	A subroutine that does the memory deduction on the non-blocked children group.
 *	Do the memory deduction in proportion of query mem of children group.
 *
 * Parameters:
 *	@in group: input group node
 *	@in groupOpDMem: the memory amount expected to decrease
 *	@in ng_index: nodegroup index
 *	@in decreaseMem: true for phase 2, and availMem is smaller than groupMem,
 *					or false for phase 3, and availMem is bigger than groupMem
 *
 * Returns: void
 */
static void DecreaseOpMemToGroup(OperatorGroupNode* group, int groupOpDMem, int ng_index, bool decreaseMem)
{
    int groupMem = group->ng_groupMemKBArray[ng_index].groupOpMemKB[0];
    int groupNum = list_length(group->childGroups) + 1;
    int i = 1;
    ListCell* lc = NULL;
    List* childTopOps = NIL;
    Plan* node = group->topNode;
    OpMemItem* itemArray = (OpMemItem*)palloc0(sizeof(OpMemItem) * groupNum);
    CmpMethod m = OPMEM_CMP;

    foreach (lc, group->childGroups) {
        OperatorGroupNode* childGroup = (OperatorGroupNode*)lfirst(lc);

        itemArray[i].id = i;

        /* for hashjoin, the block op of right tree will not affect above group, so ignore it */
        if (i == RIGHTTREE_POS + 1 && IsRequiredHashJoin(node, true)) {
            itemArray[i].opMem = 0;
        /* if top node is blocked, just count the top node, or count all the child blocked op */
        } else if (IsTopNodeBlockedOp(childGroup)) {
            bool unBlockedOp = false;

            /*
             * For mergejoin and nestloop, possible materialize node on the right tree
             * is not actually a blocked operator, so we should identify it
             */
            if (i == RIGHTTREE_POS + 1) {
                unBlockedOp = IsJoinRightTreeUnBlocked(node);
            }

            if (unBlockedOp) {
                itemArray[i].opMem = childGroup->ng_groupMemKBArray[ng_index].groupBlockedOpMemKB[0];
            } else {
                childTopOps = lappend(childTopOps, childGroup->topNode);
                groupMem += childGroup->topNode->ng_operatorMemKBArray[ng_index][0];
            }
        } else {
            itemArray[i].opMem = childGroup->ng_groupMemKBArray[ng_index].groupBlockedOpMemKB[0];
        }
        i++;
    }

    /* Add blocked op mem at the first position */
    itemArray[0].opMem = groupMem;
    itemArray[0].id = 0;
    qsort_arg(itemArray, groupNum, sizeof(OpMemItem), OpMemItemCmp, &m);

    int totalOpMem = 0;
    groupNum = ChooseDMemItems(itemArray, groupNum, &totalOpMem, groupOpDMem, decreaseMem);

    for (i = 0; i < groupNum; i++) {
        /* For the blocked op mem deduction, just do it */
        if (itemArray[i].id == 0) {
            /* totalOpMem should not be zero, we add it for fix warnings */
            if (totalOpMem != 0) {
                DecreaseOpMemToNode(
                    group, childTopOps, (int)((double)groupMem / totalOpMem * groupOpDMem), ng_index, decreaseMem);
            }
        /* For the group blocked op deduction, just record it */
        } else {
            OperatorGroupNode* childGroup = (OperatorGroupNode*)list_nth(group->childGroups, itemArray[i].id - 1);

#ifdef USE_ASSERT_CHECKING
            if (IsTopNodeBlockedOp(childGroup) && itemArray[i].id == RIGHTTREE_POS + 1 && itemArray[i].opMem != 0) {
                Assert(IsJoinRightTreeUnBlocked(node));
            }
#endif

            AssignDMemToBlockedOp(childGroup, totalOpMem, groupOpDMem, __FUNCTION__, ng_index, true);
        }
    }
    pfree(itemArray);
    list_free_ext(childTopOps);
}

/*
 * DecreaseOpMem
 *	Memory decrease logic on blocked operator mem quota. We just assign the
 *	desired memory decrease amount to op of this group or child group.
 *
 * Parameters:
 *	@in group: input group node
 *	@in ng_index: nodegroup index
 *	@in decreaseMem: true for phase 2, and availMem is smaller than groupMem,
 *					or false for phase 3, and availMem is bigger than groupMem
 *
 * Returns: void
 */
static void DecreaseOpMem(OperatorGroupNode* group, int ng_index, bool decreaseMem)
{
    int groupOpDMem = Max(group->ng_groupMemKBArray[ng_index].groupOpDMemKB,
        group->ng_groupMemKBArray[ng_index].groupBlockedOpMemKB[0] -
            (group->ng_groupMemKBArray[ng_index].groupMemKB[0] - group->ng_groupMemKBArray[ng_index].decreaseMemKB));
    Plan* node = group->topNode;
    ListCell* lc = NULL;

    if ((decreaseMem && groupOpDMem <= 0) || (!decreaseMem && groupOpDMem >= 0)) {
        return;
    }

    MEMCTL_LOG("[DecreaseOpMem] Group %d expects to decrease op mem: %dKB(%d)", group->groupId, groupOpDMem, ng_index);

    if (IsAppendNode(node)) {
        /*
         * Since append children groups are exclusive, so there's only one
         * compare item, just assign decrease mem to each group seperately
         * BTW, since append node is memory non-intensive op, so just skip it
         */
        foreach (lc, group->childGroups) {
            OperatorGroupNode* childGroup = (OperatorGroupNode*)lfirst(lc);

            AssignDMemToBlockedOp(childGroup, 0, groupOpDMem, __FUNCTION__, ng_index);
        }
    } else {
        DecreaseOpMemToGroup(group, groupOpDMem, ng_index, decreaseMem);
    }
}

/*
 * AssignDConcMemForBuild
 *	A subroutine that does the concurrent memory deduction for the join
 *	when building results on target sub branch. In this case, we have group
 *	memory consumption of build side and below stream group of other
 *	side(probe) concurrently. So decrease the mem amount in proportion.
 *
 * Parameters:
 *	@in buildGroup: sub group of build side
 *	@in probeGroup: sub group of probe side
 *	@in joinDMem: how many memory expected to decrease
 *	@in ng_index: nodegroup index
 *	@in decreaseMem: true for phase 2, and availMem is smaller than groupMem,
 *					or false for phase 3, and availMem is bigger than groupMem
 *
 * Returns: void
 */
static void AssignDConcMemForBuild(
    OperatorGroupNode* buildGroup, OperatorGroupNode* probeGroup, int joinDMem, int ng_index, bool decreaseMem)
{
    int groupNum = list_length(probeGroup->belowStreamGroups) + 1;
    OpMemItem* itemArray = (OpMemItem*)palloc0(sizeof(OpMemItem) * groupNum);
    int i = 0;
    ListCell* lc = NULL;
    int totalOpMem = 0;

    /* Put all below stream groups first, and build group at the end */
    foreach (lc, probeGroup->belowStreamGroups) {
        OperatorGroupNode* childGroup = (OperatorGroupNode*)lfirst(lc);
        itemArray[i].id = i;
        itemArray[i].opMem = childGroup->ng_groupMemKBArray[ng_index].groupMemKB[0];
        i++;
    }

    itemArray[i].id = i;
    itemArray[i].opMem = buildGroup->ng_groupMemKBArray[ng_index].groupMemKB[0];

    groupNum = ChooseDMemItems(itemArray, groupNum, &totalOpMem, joinDMem, decreaseMem);

    /* Decrease mem in proportion of group mem */
    for (i = 0; i < groupNum; i++) {
        OperatorGroupNode* targetGroup = NULL;
        if (itemArray[i].id == list_length(probeGroup->belowStreamGroups)) {
            targetGroup = buildGroup;
        } else {
            targetGroup = (OperatorGroupNode*)list_nth(probeGroup->belowStreamGroups, itemArray[i].id);
        }
        AssignDMemToGroup(targetGroup, totalOpMem, joinDMem, __FUNCTION__, ng_index);
    }
    pfree(itemArray);
}

/*
 * AssignDConcMemForProbe
 *	A subroutine that does the concurrent memory deduction for the join
 *	when probing results on target sub branch. In this case, we have group
 *	memory consumption of probe side and blocked op on build side
 *	concurrently. So decrease the mem amount in proportion.
 *
 * Parameters:
 *	@in buildGroup: sub group of build side
 *	@in probeGroup: sub group of probe side
 *	@in joinDMem: how many memory expected to decrease
 *	@in ng_index: nodegroup index
 *	@in decreaseMem: true for phase 2, and availMem is smaller than groupMem,
 *					or false for phase 3, and availMem is bigger than groupMem
 *
 * Returns: void
 */
static void AssignDConcMemForProbe(
    OperatorGroupNode* buildGroup, OperatorGroupNode* probeGroup, int joinDMem, int ng_index, bool decreaseMem)
{
    int groupNum = RIGHTTREE_POS + 1;
    OpMemItem* itemArray = (OpMemItem*)palloc0(sizeof(OpMemItem) * groupNum);
    int totalOpMem = 0;

    /* Put probe group and blocked op from build group to do comparison */
    itemArray[0].id = 0;
    itemArray[0].opMem = probeGroup->ng_groupMemKBArray[ng_index].groupMemKB[0];
    itemArray[1].id = 1;
    itemArray[1].opMem = GetBlockedOpMem(buildGroup, ng_index);

    groupNum = ChooseDMemItems(itemArray, groupNum, &totalOpMem, joinDMem, decreaseMem);

    /* Decrease mem in proportion of group mem */
    for (int i = 0; i < groupNum; i++) {
        if (itemArray[i].id == 0) {
            AssignDMemToGroup(probeGroup, totalOpMem, joinDMem, __FUNCTION__, ng_index);
        } else {
            AssignDMemToBlockedOp(buildGroup, totalOpMem, joinDMem, __FUNCTION__, ng_index);
        }
    }
    pfree(itemArray);
}

/*
 * AssignDConcMemForHashJoinBuild
 *	A subroutine that does the concurrent memory deduction for the hashjoin
 *	when build the hash table. In this case, we have blocked op memory
 *	consumption on build side, hash table and below stream group mem of
 *	probe side concurrently. So decrease the mem amount in proportion.
 *
 * Parameters:
 *	@in group: group with hashjoin top node
 *	@in joinDMem: how many memory expected to decrease
 *	@in ng_index: nodegroup index
 *	@in decreaseMem: true for phase 2, and availMem is smaller than groupMem,
 *					or false for phase 3, and availMem is bigger than groupMem
 *
 * Returns: void
 */
static void AssignDConcMemForHashJoinBuild(OperatorGroupNode* group, int joinDMem, int ng_index, bool decreaseMem)
{
#define NUM_OTHER_GROUP 2
    OperatorGroupNode* probeGroup = (OperatorGroupNode*)linitial(group->childGroups);
    OperatorGroupNode* buildGroup = (OperatorGroupNode*)lsecond(group->childGroups);
    int groupNum = list_length(probeGroup->belowStreamGroups) + NUM_OTHER_GROUP;
    OpMemItem* itemArray = (OpMemItem*)palloc0(sizeof(OpMemItem) * groupNum);
    int i = 0;
    ListCell* lc = NULL;
    int totalOpMem = 0;

    /*
     * Put groups mem of below stream, hash table mem and
     * blocked op mem of build side into array for comparison
     */
    foreach (lc, probeGroup->belowStreamGroups) {
        OperatorGroupNode* childGroup = (OperatorGroupNode*)lfirst(lc);
        itemArray[i].id = i;
        itemArray[i].opMem = childGroup->ng_groupMemKBArray[ng_index].groupMemKB[0];
        i++;
    }

    itemArray[i].id = i;
    itemArray[i].opMem = GetTopNodeMemKB(group, ng_index);
    itemArray[i + 1].id = i + 1;
    itemArray[i + 1].opMem = GetBlockedOpMem(buildGroup, ng_index);

    groupNum = ChooseDMemItems(itemArray, groupNum, &totalOpMem, joinDMem, decreaseMem);

    for (i = 0; i < groupNum; i++) {
        OperatorGroupNode* targetGroup = NULL;
        /* Decrease mem on hash table (hashjoin node opmem) */
        if (itemArray[i].id == list_length(probeGroup->belowStreamGroups)) {
            AssignDMemToTopNode(group, totalOpMem, joinDMem, __FUNCTION__, ng_index);
        /* Decrease mem on blocked op mem of build side */
        } else if (itemArray[i].id == list_length(probeGroup->belowStreamGroups) + 1) {
            AssignDMemToBlockedOp(buildGroup, totalOpMem, joinDMem, __FUNCTION__, ng_index);
        /* Decrease mem on below stream groups of probe side */
        } else {
            targetGroup = (OperatorGroupNode*)list_nth(probeGroup->belowStreamGroups, itemArray[i].id);
            AssignDMemToGroup(targetGroup, totalOpMem, joinDMem, __FUNCTION__, ng_index);
        }
    }
    pfree(itemArray);
}

/*
 * AssignDConcMemForHashJoinProbe
 *	A subroutine that does the concurrent memory deduction for the hashjoin
 *	when probing results on probe branch. In this case, we have group
 *	memory consumption of probe side and hashtable concurrently.
 *	So decrease the mem amount in proportion.
 *
 * Parameters:
 *	@in group: group with hashjoin top node
 *	@in joinDMem: how many memory expected to decrease
 *	@in ng_index: nodegroup index
 *	@in decreaseMem: true for phase 2, and availMem is smaller than groupMem,
 *					or false for phase 3, and availMem is bigger than groupMem
 *
 * Returns: void
 */
static void AssignDConcMemForHashJoinProbe(OperatorGroupNode* group, int joinDMem, int ng_index, bool decreaseMem)
{
    OperatorGroupNode* probeGroup = (OperatorGroupNode*)linitial(group->childGroups);
    int groupNum = RIGHTTREE_POS + 1;
    OpMemItem* itemArray = (OpMemItem*)palloc0(sizeof(OpMemItem) * groupNum);
    int totalOpMem = 0;

    /* Put hashtable  and group mem of probe side into array for comparison */
    itemArray[0].id = 0;
    itemArray[0].opMem = GetTopNodeMemKB(group, ng_index);
    itemArray[1].id = 1;
    itemArray[1].opMem = probeGroup->ng_groupMemKBArray[ng_index].groupMemKB[0];
    groupNum = ChooseDMemItems(itemArray, groupNum, &totalOpMem, joinDMem, decreaseMem);

    /* Decrease mem */
    for (int i = 0; i < groupNum; i++) {
        if (itemArray[i].id == 0) {
            AssignDMemToTopNode(group, totalOpMem, joinDMem, __FUNCTION__, ng_index);
        } else {
            AssignDMemToGroup(probeGroup, totalOpMem, joinDMem, __FUNCTION__, ng_index);
        }
    }
    pfree(itemArray);
}

/*
 * AssignDecreasedJoinConcMem
 *	A subroutine that does the concurrent memory deduction for join.
 *	In this function, three type of joins are all considered.
 *
 * Parameters:
 *	@in group: group with join as top node
 *	@in joinDMem: how many memory expected to decrease
 *	@in buildMaxMem: max memory usage during build phase
 *	@in probeMaxMem: max memory usage during probe phase
 *	@in build0MaxMem: only used by hashjoin, max memory usage
 *			during scanning the build tree, before hashtable building
 *	@in ng_index: nodegroup index
 *	@in decreaseMem: true for phase 2, and availMem is smaller than groupMem,
 *					or false for phase 3, and availMem is bigger than groupMem
 *
 * Returns: void
 */
static void AssignDecreasedJoinConcMem(OperatorGroupNode* group, int joinDMem, int buildMaxMem, int probeMaxMem,
    int build0MaxMem, int ng_index, bool decreaseMem)
{
    Plan* node = group->topNode;
    int joinConcMem = Max(Max(buildMaxMem, probeMaxMem), build0MaxMem);
    OperatorGroupNode* leftTreeGroup = (OperatorGroupNode*)linitial(group->childGroups);
    OperatorGroupNode* rightTreeGroup = (OperatorGroupNode*)lsecond(group->childGroups);

    if ((decreaseMem && joinDMem <= 0) || (!decreaseMem && joinDMem >= 0)) {
        return;
    }

    MEMCTL_LOG("Group %d expects to decrease conc mem: %dKB(%d) for join", group->groupId, joinDMem, ng_index);

    if (IsRequiredHashJoin(node, true)) {
        int joinMinMem = joinConcMem - joinDMem;
        int partialJoinDMem = 0;

        /*
         * when executing right tree, the top memory includes the total mem of right tree,
         * and below stream part of lefttree
         */
        if ((decreaseMem && build0MaxMem - joinMinMem > 0) || (!decreaseMem && build0MaxMem - joinMinMem < 0)) {
            partialJoinDMem = build0MaxMem - joinMinMem;
            MEMCTL_LOG("Group %d hashjoin expects to decrease conc mem %dKB(%d) "
                       "during executing right tree",
                group->groupId,
                partialJoinDMem,
                ng_index);

            AssignDConcMemForBuild(rightTreeGroup, leftTreeGroup, partialJoinDMem, ng_index, decreaseMem);
        }

        /*
         * when building hash table, the top memory includes the blocked op of right tree,
         * hash join op and below stream part of lefttree
         */
        if ((decreaseMem && buildMaxMem - joinMinMem > 0) || (!decreaseMem && buildMaxMem - joinMinMem < 0)) {
            partialJoinDMem = buildMaxMem - joinMinMem;
            MEMCTL_LOG("Group %d hashjoin expects to decrease conc mem: %dKB(%d) "
                       "during building hashtable",
                group->groupId,
                partialJoinDMem,
                ng_index);

            AssignDConcMemForHashJoinBuild(group, partialJoinDMem, ng_index, decreaseMem);
        }

        /*
         * when probing lefttree, the top memory includes whole lefttree and materialize
         */
        if ((decreaseMem && probeMaxMem - joinMinMem > 0) || (!decreaseMem && probeMaxMem - joinMinMem < 0)) {
            partialJoinDMem = probeMaxMem - joinMinMem;
            MEMCTL_LOG("Group %d hashjoin expects to decrease conc mem %dKB(%d) "
                       "during probing left tree",
                group->groupId,
                partialJoinDMem,
                ng_index);

            AssignDConcMemForHashJoinProbe(group, partialJoinDMem, ng_index, decreaseMem);
        }
    } else if (IsA(node, MergeJoin) || IsA(node, VecMergeJoin)) {
        int joinMinMem = joinConcMem - joinDMem;
        int partialJoinDMem = 0;

        /*
         * when scanning the lefttree, the top memory includes the top mem of left tree,
         * and below stream part of right tree
         */
        if ((decreaseMem && buildMaxMem - joinMinMem > 0) || (!decreaseMem && buildMaxMem - joinMinMem < 0)) {
            partialJoinDMem = buildMaxMem - joinMinMem;
            MEMCTL_LOG("Group %d mergejoin expects to decrease conc mem %dKB(%d) "
                       "during scanning left tree",
                group->groupId,
                partialJoinDMem,
                ng_index);

            AssignDConcMemForBuild(leftTreeGroup, rightTreeGroup, partialJoinDMem, ng_index, decreaseMem);
        }

        /*
         * When scan the right tree, the top memory includes the blocked op of lefttree and whole right tree
         */
        if ((decreaseMem && probeMaxMem - joinMinMem > 0) || (!decreaseMem && probeMaxMem - joinMinMem < 0)) {
            partialJoinDMem = probeMaxMem - joinMinMem;
            MEMCTL_LOG("Group %d mergejoin expects to decrease conc mem %dKB(%d) "
                       "during scanning right tree",
                group->groupId,
                partialJoinDMem,
                ng_index);

            AssignDConcMemForProbe(leftTreeGroup, rightTreeGroup, partialJoinDMem, ng_index, decreaseMem);
        }
    } else {
        int joinMinMem = joinConcMem - joinDMem;
        int partialJoinDMem = 0;

        /*
         * when executing right tree, the top memory includes the total mem of right tree,
         * and below stream part of lefttree
         */
        if ((decreaseMem && buildMaxMem - joinMinMem > 0) || (decreaseMem && buildMaxMem - joinMinMem < 0)) {
            partialJoinDMem = buildMaxMem - joinMinMem;
            MEMCTL_LOG("Group %d nestloop expects to decrease conc mem %dKB(%d) "
                       "during scanning right tree",
                group->groupId,
                partialJoinDMem,
                ng_index);

            AssignDConcMemForBuild(rightTreeGroup, leftTreeGroup, partialJoinDMem, ng_index, decreaseMem);
        }

        /*
         * when probing lefttree, the top memory includes whole lefttree and materialize
         */
        if ((decreaseMem && probeMaxMem - joinMinMem > 0) || (!decreaseMem && probeMaxMem - joinMinMem < 0)) {
            partialJoinDMem = probeMaxMem - joinMinMem;
            MEMCTL_LOG("Group %d nestloop expects to decrease conc mem %dKB(%d) "
                       "during scanning left tree",
                group->groupId,
                partialJoinDMem,
                ng_index);

            AssignDConcMemForProbe(rightTreeGroup, leftTreeGroup, partialJoinDMem, ng_index, decreaseMem);
        }
    }
}

/*
 * AssignConcDecreaseMemToChild
 *	Memory decrease logic on concurrent mem quota. We just assign the
 *	desired memory decrease amount considerring all kinds of concurrent
 *	status
 *
 * Parameters:
 *	@in group: group with join as top node
 *	@in ng_index: nodegroup index
 *	@in decreaseMem: true for phase 2, and availMem is smaller than groupMem,
 *					or false for phase 3, and availMem is bigger than groupMem
 *
 * Returns: void
 */
static void AssignConcDecreaseMemToChild(OperatorGroupNode* group, int ng_index, bool decreaseMem)
{
    ListCell* lc = NULL;
    int groupConcDMem =
        group->ng_groupMemKBArray[ng_index].groupConcMemKB[0] -
        (group->ng_groupMemKBArray[ng_index].groupMemKB[0] - group->ng_groupMemKBArray[ng_index].decreaseMemKB);
    Plan* node = group->topNode;

    if ((decreaseMem && groupConcDMem <= 0) || (!decreaseMem && groupConcDMem >= 0)) {
        return;
    }

    MEMCTL_LOG("[AssignConcDecreaseMemToChild] Group %d expects to decrease conc mem: %dKB(%d)",
        group->groupId,
        groupConcDMem,
        ng_index);

    /* For append node, each child gruops are exclusive, so just decrease them seperately */
    if (IsAppendNode(node)) {
        int belowStreamGrpDMem =
            group->ng_groupMemKBArray[ng_index].belowStreamConcMemKB[0] -
            (group->ng_groupMemKBArray[ng_index].groupMemKB[0] - group->ng_groupMemKBArray[ng_index].decreaseMemKB);

        /*
         * Get rid of below stream decrease mem as we only
         * consider more added mem by above stream group
         */
        groupConcDMem -= belowStreamGrpDMem;
        if (groupConcDMem > 0) {
            foreach (lc, group->childGroups) {
                OperatorGroupNode* childGroup = (OperatorGroupNode*)lfirst(lc);
                AssignDMemToGroup(childGroup, 0, groupConcDMem, __FUNCTION__, ng_index);
            }
        }
    } else {
        /* child can coexist, so adds them up into comparison */
        int groupNum = list_length(group->childGroups);
        OpMemItem* itemArray = (OpMemItem*)palloc0(sizeof(OpMemItem) * groupNum);
        int i = 0;
        int build0MaxMem = 0;
        int buildMaxMem = 0;
        int probeMaxMem = 0;
        int joinConcMem = IsBlockedJoinNode(node)
                              ? CalcJoinConcMem(group, &buildMaxMem, &probeMaxMem, &build0MaxMem, ng_index, NULL)
                              : 0;

        foreach (lc, group->childGroups) {
            itemArray[i].id = i;

            /*
             * For join branches, we only record join concurrent mem
             * in first place, and skip the second time. If later jon
             * concurrent memory should be decreased, we call other
             * routines to do it
             */
            if (IsBlockedJoinNode(node) && i <= RIGHTTREE_POS) {
                if (i == 0) {
                    itemArray[i].opMem = joinConcMem;
                }
                continue;
            }

            /*
             * Get rid of below stream decrease mem as we only
             * consider more added mem by above stream group
             */
            OperatorGroupNode* childGroup = (OperatorGroupNode*)lfirst(lc);
            int belowStreamGrpDMem = Max(childGroup->ng_groupMemKBArray[ng_index].belowStreamConcMemKB[0] -
                                             (childGroup->ng_groupMemKBArray[ng_index].groupMemKB[0] -
                                                 childGroup->ng_groupMemKBArray[ng_index].decreaseMemKB),
                0);
            itemArray[i].opMem = childGroup->ng_groupMemKBArray[ng_index].groupMemKB[0] - belowStreamGrpDMem;
            i++;
        }

        int totalOpMem = 0;
        groupNum = ChooseDMemItems(itemArray, groupNum, &totalOpMem, groupConcDMem, decreaseMem);

        for (i = 0; i < groupNum; i++) {
            /* For join branches, decrease join concurrent memory usage */
            if (IsBlockedJoinNode(node)) {
                if (itemArray[i].id == 0 && totalOpMem != 0) {
                    AssignDecreasedJoinConcMem(group,
                        MEM_DECREASE(joinConcMem, groupConcDMem),
                        buildMaxMem,
                        probeMaxMem,
                        build0MaxMem,
                        ng_index,
                        decreaseMem);
                }
                continue;
            }

            /* Decrease group memory usage to child group */
            OperatorGroupNode* childGroup = (OperatorGroupNode*)list_nth(group->childGroups, itemArray[i].id);
            int groupDMem = MEM_DECREASE(itemArray[i].opMem, groupConcDMem);
            AssignDMemToGroup(childGroup, 0, groupDMem, __FUNCTION__, ng_index);
        }

        pfree(itemArray);
    }
}

/* ---------------------------------------------------------- */
/*               memory comparison and calculation routines                      */
/* ---------------------------------------------------------- */
/*
 * AdjustMemForEstimationCrisis
 *	adjust the operator memory usage if we find there's a big estimation
 *	gap.
 *
 * Parameters:
 *	@in context: plan tree walker context
 *	@in node: plan node that needs to do memory assignment
 *	@out info: mem info given by optimizer, and if we change the memory
 *			used, we should also change max and min mem in the info
 *
 * Returns: void
 */
static void AdjustMemForEstimationCrisis(MethodPlanWalkerContext* context, Plan* node, OpMemInfo* info)
{
    int opMemOrig = node->operatorMemKB[0];

    if (opMemOrig <= MAX_REASONABLE_MEM) {
        return;
    }

    int estOpMem = Max((int)(GetChildMaxMem(context->groupNode) * HALF_AMOUNT), STATEMENT_MIN_MEM * 1024L);

    if (estOpMem < opMemOrig) {
        node->operatorMemKB[0] = estOpMem;
        /* In tenant scenario, we don't allow auto spread of operator mem */
        if (!context->use_tenant) {
            node->operatorMaxMem = node->operatorMemKB[0];
        }
        info->minMem = Min(estOpMem / HASH_MAX_DISK_SIZE, info->minMem);
        info->maxMem = estOpMem;
        MEMCTL_LOG("Node %d adjust memory usage to %dKB from %dKB", node->plan_node_id, estOpMem, opMemOrig);
    }
}

/*
 * AssignMemOpConsumption
 *	During first traverse phase, assign memory usage of the plan node
 *	from the memory info generated by optimizer. Optimistically, we use
 *	max mem since we don't want disk spill
 *
 * Parameters:
 *	@in context: plan tree walker context
 *	@in node: plan node that needs to do memory assignment
 *
 * Returns: void
 */
static void AssignMemOpConsumption(MethodPlanWalkerContext* context, Plan* node)
{
/* If estimation is too huge, then adjust it */
#define ADJ_HUGE_MEM                                                                                                \
    int max_op_mem = MAX_OP_MEM;                                                                                    \
    if (nodeTag(node) == T_VecModifyTable)                                                                          \
        max_op_mem = MAX_OP_MEM * 2;                                                                                \
    if ((context->groupNode->childLevel > 1 && node->operatorMemKB[0] >= max_op_mem) || node->operatorMemKB[0] < 0) \
        AdjustMemForEstimationCrisis(context, node, &plan->mem_info);
#define ASS_AND_ADJ_MEM(adjMem)                                               \
    node->operatorMemKB[0] = (int)Min(plan->mem_info.maxMem, INT_MAX);        \
    if ((adjMem) > 0 && context->groupNode->childLevel >= MAX_ACCURATE_LEVEL) \
        AdjustMemForEstimationCrisis(context, node, &plan->mem_info);         \
    ADJ_HUGE_MEM;
#define ASS_AND_ADJ_AGG_MEM                                                                 \
    node->operatorMemKB[0] = (int)Min(plan->mem_info.maxMem, INT_MAX);                      \
    if (AGG_INACCURATE(plan, node) && context->groupNode->childLevel >= MAX_ACCURATE_LEVEL) \
        AdjustMemForEstimationCrisis(context, node, &plan->mem_info);                       \
    ADJ_HUGE_MEM;

    OperatorGroupNode* group = context->groupNode;
    /* keep the same as OperatorGroupNode->ng_groupMemKBArray */
    node->ng_num = context->ng_num;
    node->ng_operatorMemKBArray = (int**)palloc0(node->ng_num * sizeof(int*));
    for (int i = 0; i < node->ng_num; i++) {
        node->ng_operatorMemKBArray[i] = (int*)palloc0(2 * sizeof(int));
    }

    switch (nodeTag(node)) {
        case T_Stream:
        case T_VecStream: {
            node->operatorMemKB[0] = 0;
            node->operatorMaxMem = node->lefttree->operatorMaxMem;

            /* stream node is special because it can cross two nodegroups */
            GetStreamMemConsumption(node, context);
        } break;
        case T_Material:
        case T_VecMaterial: {
            Material* plan = (Material*)node;
            ASS_AND_ADJ_MEM(node->lefttree->operatorMaxMem);
        } break;
        case T_Sort:
        case T_VecSort: {
            Sort* plan = (Sort*)node;
            ASS_AND_ADJ_MEM(node->lefttree->operatorMaxMem);
        } break;
        case T_HashJoin:
        case T_VecHashJoin: {
            /*
             * hashjoin of row engine will pre-calculate nbatches, so
             * we'd better not adjust it
             */
            HashJoin* plan = (HashJoin*)node;
            ASS_AND_ADJ_MEM(IsA(plan, VecHashJoin) && node->righttree->operatorMaxMem);
        } break;
        case T_SetOp:
        case T_VecSetOp: {
            SetOp* plan = (SetOp*)node;
            /* Now only hash setop is memory intensive op */
            if (plan->strategy == SETOP_HASHED) {
                ASS_AND_ADJ_AGG_MEM;
            } else {
                node->operatorMemKB[0] = non_operator_memory;
            }
        } break;
        case T_VecAgg:
        case T_Agg: {
            Agg* plan = (Agg*)node;
            /* Now only hashagg is memory intensive op */
            if (plan->aggstrategy == AGG_HASHED) {
                if (node->vec_output) {
                    int init_agg_mem = GetHashaggInitializedMem(plan) * SET_DOP(node->dop);
                    plan->mem_info.maxMem += init_agg_mem;
                    plan->mem_info.minMem += init_agg_mem;
                }
                ASS_AND_ADJ_AGG_MEM;
            } else {
                node->operatorMemKB[0] = non_operator_memory;
            }
        } break;
        case T_ModifyTable:
        case T_VecModifyTable: {
            ModifyTable* plan = (ModifyTable*)node;
            if (plan->mem_info.maxMem > 0) {
                ASS_AND_ADJ_MEM(((Plan*)lfirst(list_head(plan->plans)))->operatorMaxMem);
            } else {
                node->operatorMemKB[0] = 0;
            }
        } break;
        case T_CStoreIndexScan:
        case T_CStoreIndexHeapScan: {
            Scan* plan = (Scan*)node;
            if (plan->mem_info.maxMem > 0) {
                ASS_AND_ADJ_MEM(0);
            } else {
                node->operatorMemKB[0] = non_operator_memory;
            }
        } break;
        default:
            node->operatorMemKB[0] = non_operator_memory;
            break;
    }

    /*
     * add the operator memory into group operator total memory
     * If opmem exceeds 256MB, we'll save a min mem, also save a min
     * groupOpMem.
     */
    if (nodeTag(node) == T_VecModifyTable) {
        if (node->operatorMemKB[0] > STATEMETN_MIN_MODIFY_MEM * 1024L) {
            node->operatorMemKB[1] =
                Max(STATEMETN_MIN_MODIFY_MEM * 1024L, (int)(node->operatorMemKB[0] * DECREASED_MIN_CMP_GAP));
        } else {
            node->operatorMemKB[1] = 0;
        }
    } else if (node->operatorMemKB[0] > STATEMENT_MIN_MEM * 1024L) {
        node->operatorMemKB[1] = Max(STATEMENT_MIN_MEM * 1024L, (int)(node->operatorMemKB[0] * DECREASED_MIN_CMP_GAP));
    }

    /* for original case */
    node->ng_operatorMemKBArray[0][0] = node->operatorMemKB[0];
    node->ng_operatorMemKBArray[0][1] = node->operatorMemKB[1];
    /* assign left operatorMemKB for multiple logic cluster case */
    if (node->ng_num > 1) {
        /* stream node's operatorMemKB is simple 0 now */
        if (nodeTag(node) != T_Stream && nodeTag(node) != T_VecStream && node->exec_nodes &&
            node->exec_nodes->nodeList) {
            ListCell* cell = NULL;
            int i = 1;
            /* distribution in exec_nodes is not reliable sometime */
            Distribution* node_dist = ng_convert_to_distribution(node->exec_nodes->nodeList);
            Bitmapset* left_nodeids = node_dist->bms_data_nodeids;
            Assert(!bms_is_empty(left_nodeids));

            foreach (cell, context->ng_distributionList) {
                Distribution* dist = (Distribution*)lfirst(cell);
                Bitmapset* inter = bms_intersect(dist->bms_data_nodeids, left_nodeids);
                if (!bms_is_empty(inter)) {
                    Bitmapset* tmp_nodeids = bms_difference(left_nodeids, dist->bms_data_nodeids);
                    bms_free(left_nodeids);
                    left_nodeids = tmp_nodeids;

                    node->ng_operatorMemKBArray[i][0] = node->operatorMemKB[0];
                    node->ng_operatorMemKBArray[i][1] = node->operatorMemKB[1];
                }
                bms_free(inter);

                /* no left to process */
                if (bms_is_empty(left_nodeids)) {
                    break;
                }
                i++;
            }

            Assert(bms_is_empty(left_nodeids));
            bms_free(left_nodeids);
            pfree(node_dist);
        }
    }

    if (group != NULL) {
        for (int i = 0; i < node->ng_num; i++) {
            GroupAddMemory(group->ng_groupMemKBArray[i].groupOpMemKB, node->ng_operatorMemKBArray[i]);
        }
    }
}

/*
 * CompareOpMemFuzzy
 *	Compare the two memory quota, and if they are within 1% gap,
 *	then we think they are almost equal.
 *
 * Parameters:
 *	@in mem1: memory quota 1.
 *	@in mem2: memory quota 2.
 *
 * Returns: true if two quota is the same, else false.
 */
static bool CompareOpMemFuzzy(double mem1, double mem2)
{
#define FUZZY_FACTOR 0.01
    if (mem1 >= mem2 * (1 - FUZZY_FACTOR) && mem1 <= mem2 * (1 + FUZZY_FACTOR)) {
        return true;
    }

    return false;
}

/*
 * CalOpSpreadMem
 *	Calculate how many mem can be spread by current operator
 *
 * Parameters:
 *	@in node: the plan node that need to adjust memory consumption
 *	@in context: plan tree walker context
 *
 * Returns: void
 */
static int CalOpSpreadMem(Plan* node, MethodPlanWalkerContext* context)
{
    int spreadMem = Max(0, context->ng_queryMemKBArray[0].availMemKB - context->ng_queryMemKBArray[0].currQueryMemKB);
    int opSpreadMem = 0;

    if (context->groupNode->ng_groupMemKBArray[0].topNodeDMemKB != -INT_MAX) {
        opSpreadMem = -context->groupNode->ng_groupMemKBArray[0].topNodeDMemKB;
    }

    /* For current node, we allow spread as much as below stream mem */
    spreadMem += Max(0,
        context->groupNode->ng_groupMemKBArray[0].belowStreamConcMemKB[0] -
            context->groupNode->ng_groupMemKBArray[0].groupBlockedOpMemKB[0]);
    if (opSpreadMem > 0 && opSpreadMem != -INT_MAX) {
        if (t_thrd.wlm_cxt.thread_climgr->active_statements > 1) {
            double ratio = (spreadMem == 0) ? 0 : Min(1.0, (double)opSpreadMem / spreadMem);
            const double decreaseRatio = 1.0 - (1.0 - ratio) * (1.0 - ratio);
            Assert((int)(node->operatorMemKB[0] + spreadMem * decreaseRatio) >= 0);
            return Max(
                (int)(node->operatorMemKB[0] + spreadMem * decreaseRatio),
                (int)(STATEMENT_MIN_MEM * HALF_AMOUNT * 1024L));
        } else {
            return node->operatorMemKB[0] + spreadMem;
        }
    }

    return 0;
}

/*
 * AdjustMemOpConsumption
 *	The main logic entry for memory adjustment phase. We don't want
 *	the memory to be very small, and if the memory is close to disk spill,
 *	then enlarge it in some extent
 *
 * Parameters:
 *	@in context: plan tree walker context
 *	@in node: the plan node that need to adjust memory consumption
 *
 * Returns: void
 */
static void AdjustMemOpConsumption(MethodPlanWalkerContext* context, Plan* node)
{
#define BIG_MEM_RATIO 0.2
#define SMALL_MEM_RATIO 0.1
#define ADJ_OP_MEM                                                                         \
    if (CompareOpMemFuzzy((double)node->operatorMemKB[0], plan->mem_info.maxMem)) {        \
        if (allow_spread)                                                                  \
            node->operatorMaxMem = CalOpSpreadMem(node, context);                          \
        if (node->operatorMemKB[0] < availMem * SMALL_MEM_RATIO)                           \
            node->operatorMemKB[0] = (int)(node->operatorMemKB[0] * (1 + BIG_MEM_RATIO));  \
    } else if (CompareOpMemFuzzy((double)node->operatorMemKB[0], plan->mem_info.minMem) && \
               node->operatorMemKB[0] < availMem * SMALL_MEM_RATIO) {                      \
        node->operatorMemKB[0] = (int)(node->operatorMemKB[0] * (1 + SMALL_MEM_RATIO));    \
    }                                                                                      \
    if (node->operatorMemKB[0] <= MIN_OP_MEM) {                                            \
        node->operatorMemKB[0] = MIN_OP_MEM;                                               \
        if (allow_spread)                                                                  \
            node->operatorMaxMem = CalOpSpreadMem(node, context);                          \
    }

    bool allow_spread = !context->use_tenant;
    /* availMem of index 0 must be the smallest one */
    int availMem = context->ng_queryMemKBArray[0].availMemKB;
    MEMCTL_LOG("AdjustMemOpConsumption: Node %d, availMem %d, operatorMem %d",
        node->plan_node_id,
        availMem,
        node->operatorMemKB[0]);

    /*
     * No need to adjust operatorMemKB for each nodegroup, for it is not used in executing.
     */
    switch (nodeTag(node)) {
        case T_Stream:
        case T_VecStream:
            GetStreamMemConsumption(node, context);
            break;
        case T_Material:
        case T_VecMaterial: {
            Material* plan = (Material*)node;
            ADJ_OP_MEM;
        } break;
        case T_Sort:
        case T_VecSort: {
            Sort* plan = (Sort*)node;
            ADJ_OP_MEM;
        } break;
        case T_HashJoin:
        case T_VecHashJoin: {
            HashJoin* plan = (HashJoin*)node;
            if (IsA(plan, HashJoin)) {
                plan->join.plan.righttree->operatorMemKB[0] = node->operatorMemKB[0];
                node->operatorMemKB[0] = non_operator_memory;
                node = plan->join.plan.righttree;
                /* HashJoin will assign memory usage beforehand, so always allow auto spread */
                if (allow_spread) {
                    node->operatorMaxMem = CalOpSpreadMem(node, context);
                }
            }
            ADJ_OP_MEM;
        } break;
        case T_SetOp:
        case T_VecSetOp: {
            SetOp* plan = (SetOp*)node;
            if (plan->strategy == SETOP_HASHED) {
                ADJ_OP_MEM;
            } else {
                node->operatorMemKB[0] = non_operator_memory;
            }
        } break;
        case T_VecAgg:
        case T_Agg: {
            Agg* plan = (Agg*)node;
            if (plan->aggstrategy == AGG_HASHED) {
                ADJ_OP_MEM;
            } else if (plan->aggstrategy == AGG_SORTED) {
                if (IsA(node->lefttree, Sort) && plan->chain != NIL) {
                    node->operatorMemKB[0] = node->lefttree->operatorMemKB[0];
                    node->operatorMaxMem = node->lefttree->operatorMaxMem;
                } else
                    node->operatorMemKB[0] = MIN_OP_MEM;
            } else {
                node->operatorMemKB[0] = non_operator_memory;
            }
        } break;
        case T_WindowAgg:
        case T_VecWindowAgg:
            node->operatorMemKB[0] = MIN_OP_MEM;
            break;
        case T_ModifyTable:
        case T_VecModifyTable: {
            ModifyTable* plan = (ModifyTable*)node;
            if (plan->mem_info.maxMem > 0) {
                ADJ_OP_MEM;
            } else {
                node->operatorMemKB[0] = 0;
            }
        } break;
        case T_CStoreIndexScan:
        case T_CStoreIndexHeapScan: {
            Scan* plan = (Scan*)node;
            if (plan->mem_info.maxMem > 0) {
                ADJ_OP_MEM;
            } else {
                node->operatorMemKB[0] = non_operator_memory;
            }
        } break;
        default:
            node->operatorMemKB[0] = non_operator_memory;
            break;
    }

    if (node->operatorMaxMem > 0) {
        MEMCTL_LOG("Node %d allows auto spread %dKB mem", node->plan_node_id, node->operatorMaxMem);
    }
}

/*
 * OpMemItemCmp
 *	Opmem unit compare routines. With the given compare method,
 *	do compare on item a and b, and return the result.
 *
 * Parameters:
 *	@in a, b: compare items
 *	@in arg: arg of compare function template, for this function it's the
 *		pointer to compare method
 *
 * Returns: 1 if a is "logically small" than b, -1 if a is "logically big" than b,
 *			0 if two is equal.
 */
static int OpMemItemCmp(const void* a, const void* b, void* arg)
{
    OpMemItem* qa = (OpMemItem*)a;
    OpMemItem* qb = (OpMemItem*)b;
    CmpMethod method = *(CmpMethod*)arg;

    /* when comparing opmem, we want from large to small */
    if (method == OPMEM_CMP) {
        if (qa->opMem > qb->opMem) {
            return -1;
        } else if (qa->opMem < qb->opMem) {
            return 1;
        } else {
            return 0;
        }
    /* when comparing cost, we want from small to large */
    } else if (method == COST_CMP) {
        if (qa->cost < qb->cost) {
            return -1;
        } else if (qa->cost > qb->cost) {
            return 1;
        } else {
            return 0;
        }
    }
    return 0;
}

/*
 * CalcDecreaseRegressCost
 *	Given the expected decrease memory amount, calculate the regression
 *	cost for each operator in comprison items, and record it in the item
 *
 * Parameters:
 *	@in item: comparison item, contains plan and regression cost info
 *	@in dMem: expected decrease memory amount
 *
 * Returns: void
 */
static void CalcDecreaseRegressCost(OpMemItem* item, int dMem)
{
/* This macro calculate the regression cost ratio of the max regression
 * cost. For hashjoin and materialize, all data will be spilled, so not
 * handled here. For sort, with more mem, regress cost will be a little small
 * since merge times will be less. For hashagg and hash setop, only part of
 * data that doesn't match data in hashtable will be spilled */
#define PLAN_REGRESS_RATIO                                                                     \
    if (IsA(plan, Sort) || IsA(plan, VecSort)) {                                               \
        if (maxDMem > dMem)                                                                    \
            ratio = log(plan->mem_info.minMem) / log(plan->mem_info.maxMem - dMem);            \
    } else if (IsA(plan, Agg) || IsA(plan, VecAgg) || IsA(plan, SetOp) || IsA(plan, VecSetOp)) \
        ratio = (double)(plan->mem_info.maxMem - item->opMem) / plan->mem_info.maxMem;

/*
 * This macro calculate the regression cost of every plan if spilling
 * data to disk. Since we don't  want small data amount to be spilled,
 * we'll make it large by multiply by a mem decrease proportion.
 */
#define PLAN_REGRESSION_COST                                        \
    double maxDMem = plan->mem_info.maxMem - plan->mem_info.minMem; \
    double ratio = 1.0;                                             \
    PLAN_REGRESS_RATIO;                                             \
    item->opMem = (int)Min(Min(maxDMem, INT_MAX), dMem);            \
    item->cost = plan->mem_info.regressCost * (ratio)*Min(1.0, (double)dMem / maxDMem);

    Plan* node = item->plan;

    switch (nodeTag(node)) {
        case T_Material:
        case T_VecMaterial: {
            Material* plan = (Material*)node;
            PLAN_REGRESSION_COST;
        } break;
        case T_Sort:
        case T_VecSort: {
            Sort* plan = (Sort*)node;
            PLAN_REGRESSION_COST;
        } break;
        case T_HashJoin:
        case T_VecHashJoin: {
            HashJoin* plan = (HashJoin*)node;
            PLAN_REGRESSION_COST;
        } break;
        case T_SetOp:
        case T_VecSetOp: {
            SetOp* plan = (SetOp*)node;
            if (plan->strategy == SETOP_HASHED) {
                PLAN_REGRESSION_COST;
            } else {
                item->cost = g_instance.cost_cxt.disable_cost;
            }
        } break;
        case T_VecAgg:
        case T_Agg: {
            Agg* plan = (Agg*)node;
            if (plan->aggstrategy == AGG_HASHED) {
                PLAN_REGRESSION_COST;
            } else {
                item->cost = g_instance.cost_cxt.disable_cost;
            }
        } break;
        case T_ModifyTable:
        case T_VecModifyTable: {
            ModifyTable* plan = (ModifyTable*)node;
            if (plan->mem_info.maxMem > 0) {
                PLAN_REGRESSION_COST;
            } else {
                item->cost = g_instance.cost_cxt.disable_cost;
            }
        } break;
        case T_CStoreIndexScan:
        case T_CStoreIndexHeapScan: {
            Scan* plan = (Scan*)node;
            if (plan->mem_info.maxMem > 0) {
                PLAN_REGRESSION_COST;
            } else {
                item->cost = g_instance.cost_cxt.disable_cost;
            }
        } break;
        default:
            item->cost = g_instance.cost_cxt.disable_cost;
            break;
    }
}

/*
 * ChooseDMemItems
 *	When we do memory item comparison, we want to order the
 *	items from large to small. And we want to find the boundary
 *	of the items that differentiate big and small items, then we only
 *	do memory decrease on big memory items
 *
 * Parameters:
 *	@in itemArray: comparison item array
 *	@in groupNum: total number of items in the array
 *	@out totalOpMem: total memory usage of items in the array
 *	@in DMem: how many memory need to decrease
 *	@in decreaseMem: true for phase 2, and availMem is smaller than groupMem,
 *					or false for phase 3, and availMem is bigger than groupMem
 *
 * Returns: void
 */
static int ChooseDMemItems(OpMemItem* itemArray, int groupNum, int* totalOpMem, int DMem, bool decreaseMem)
{
    CmpMethod m = OPMEM_CMP;
    int i = 1;

    if (groupNum == 0) {
        return groupNum;
    }

    qsort_arg(itemArray, groupNum, sizeof(OpMemItem), OpMemItemCmp, &m);

    /* Head item is definitely need to decrease mem */
    for (i = 0; i < groupNum; i++) {
        *totalOpMem += itemArray[i].opMem;

        if (decreaseMem) {
            /* If there's enough memory to decrease for the top ops, then just decrease them */
            if (*totalOpMem > (long)DMem + MAX_REASONABLE_MEM / 2 * (i + 1)) {
                break;
            /* If find the boundary, that there's a gap between groups, then exit */
            } else if (i < groupNum - 1 &&
                     *totalOpMem > DMem + itemArray[i + 1].opMem * i * (1 + DECREASED_MIN_CMP_GAP * (i + 1))) {
                break;
            }
        /* skip 0 groupOpMem */
        } else if (i < groupNum - 1 && itemArray[i + 1].opMem == 0) {
            break;
        }
        if (i == groupNum - 1) {
            break;
        }
    }

    return i + 1;
}

/*
 * QueryNeedPlanB
 *	check if we need to do replanning for logic cluster.
 *
 * Parameters:
 *	@in stmt: original plan
 *
 * Returns: bool
 */
bool QueryNeedPlanB(PlannedStmt* stmt)
{
    bool result = false;
    Oid my_group_oid = InvalidOid;
    Oid my_user_oid = GetCurrentUserId();
    lc_replan_nodegroup = InvalidOid;

    /* no memory control */
    if (!ENABLE_WORKLOAD_CONTROL || !WLMIsInfoInit() || !t_thrd.utils_cxt.gs_mp_inited) {
        MEMCTL_LOG("Not replan for query: no memory control");
        return false;
    }

    /* invalid user */
    if (!OidIsValid(my_user_oid)) {
        MEMCTL_LOG("Not replan for query: invalid user oid");
        return false;
    }

    if (stmt->ng_num == 0) {
        MEMCTL_LOG("Not replan for query: nodegroup number 0");
        return false;
    }

    if (stmt->query_mem[0] < SIMPLE_THRESHOLD) {
        MEMCTL_LOG("Not replan for query: small query_mem[0]");
        return false;
    }

    /* not logic cluster */
    if (!in_logic_cluster()) {
        MEMCTL_LOG("Not replan for query: not logic cluster");
        return false;
    }

    /* no logic cluster belongs to */
    my_group_oid = get_pgxc_logic_groupoid(my_user_oid);
    if (!OidIsValid(my_group_oid)) {
        MEMCTL_LOG("Not replan for query: invalid nodegroup oid");
        return false;
    }

    /* check optimal_plan's mem */
    int my_group_idx = -1;
    for (int i = 0; i < stmt->ng_num; i++) {
        if (stmt->ng_queryMem[i].query_mem[0] >= SIMPLE_THRESHOLD && stmt->ng_queryMem[i].ng_oid != my_group_oid) {
            /* set replan nodegroup */
            lc_replan_nodegroup = my_group_oid;
            result = true;
            MEMCTL_LOG("Need replan for query: nodegroup %s, query_mem[0] %d",
                stmt->ng_queryMem[i].nodegroup,
                stmt->ng_queryMem[i].query_mem[0]);
        }

        if (stmt->ng_queryMem[i].ng_oid == my_group_oid) {
            my_group_idx = i;
        }

        if (result && my_group_idx != -1) {
            break;
        }
    }

    Assert(my_group_idx != -1);
    if (result && stmt->ng_queryMem[my_group_idx].query_mem[0] < SIMPLE_THRESHOLD) {
        MEMCTL_LOG("Need replan for query: set my group query_mem to 32M");
        stmt->ng_queryMem[my_group_idx].query_mem[0] = SIMPLE_THRESHOLD;
        stmt->ng_queryMem[my_group_idx].query_mem[1] = SIMPLE_THRESHOLD;
    }

    if (!result) {
        MEMCTL_LOG("Not replan for query: small query_mem[0] except my own nodegroup");
    }

    return result;
}

/*
 * SetNgAssignedQueryMem
 *	set assigned_query_mem[1] for each nodegroup to decrease mem later.
 *
 * Parameters:
 *	@in cxt: MethodPlanWalkerContext
 *
 * Returns: void
 */
void SetNgAssignedQueryMem(MethodPlanWalkerContext* cxt, int input_mem)
{
    ListCell* cell = NULL;
    int assigned_query_mem;
    int i = 1;
    Oid my_group = get_pgxc_logic_groupoid(GetCurrentUserId());
    int query_mem = ASSIGNED_QUERY_MEM(u_sess->attr.attr_sql.statement_mem, u_sess->attr.attr_sql.statement_max_mem);

    Assert(cxt->ng_distributionList);
    foreach (cell, cxt->ng_distributionList) {
        Distribution* dist = (Distribution*)lfirst(cell);
        if (my_group == dist->group_oid) {
            cxt->ng_queryMemKBArray[i].assigned_query_mem_1 = input_mem;
            continue;
        }

        char* group_name = get_pgxc_groupname(dist->group_oid);
        assigned_query_mem = WLMGetAvailbaleMemory(group_name) * 1024;
        Assert(assigned_query_mem >= 0);

        if (query_mem > 0) {
            cxt->ng_queryMemKBArray[i].assigned_query_mem_1 = Min(assigned_query_mem, query_mem);
        } else {
            cxt->ng_queryMemKBArray[i].assigned_query_mem_1 =
                ASSIGNED_QUERY_MEM(assigned_query_mem, u_sess->attr.attr_sql.statement_max_mem);
        }

        MEMCTL_LOG("Set available mem: nodegroup %s, mem %d", group_name, assigned_query_mem);
        i++;
    }
}

/*
 * ReSetNgQueryMem
 *	reset query_mem if exceeds max mem, for logic cluster.
 *
 * Parameters:
 *	@in result: PlannedStmt
 *
 * Returns: void
 */
void ReSetNgQueryMem(PlannedStmt* result)
{
    int assigned_query_mem = result->assigned_query_mem[1];
    Assert(result->ng_num > 0);

    for (int i = 0; i < result->ng_num; i++) {
        if (result->ng_queryMem[i].query_mem[0] > assigned_query_mem) {
            result->ng_queryMem[i].query_mem[0] = assigned_query_mem;
        }
        if (result->ng_queryMem[i].query_mem[1] > assigned_query_mem) {
            result->ng_queryMem[i].query_mem[1] = assigned_query_mem;
        }
        if (result->ng_queryMem[i].query_mem[1] == 0) {
            result->ng_queryMem[i].query_mem[1] = result->ng_queryMem[i].query_mem[0];
        }
    }
}

/*
 * ResourceRelease
 *	release memory of OperatorGroupNode after usage.
 *
 * Parameters:
 *	@in groupNode: OperatorGroupNode to be free
 *
 * Returns: void
 */
void ReleaseGroup(OperatorGroupNode* groupNode)
{
    if (groupNode == NULL) {
        return;
    }

    if (groupNode->childGroups) {
        ListCell* lc = NULL;
        foreach (lc, groupNode->childGroups) {
            OperatorGroupNode* childGroup = (OperatorGroupNode*)lfirst(lc);
            ReleaseGroup(childGroup);
        }

        pfree_ext(groupNode->childGroups);
    }

    if (groupNode->ng_groupMemKBArray) {
        pfree_ext(groupNode->ng_groupMemKBArray);
    }

    pfree_ext(groupNode);
}

/*
 * ResourceRelease
 *	release MethodPlanWalkerContext memory after usage.
 *
 * Parameters:
 *	@in cxt: MethodPlanWalkerContext to be free
 *
 * Returns: void
 */
void ReleaseResource(MethodPlanWalkerContext* ctx)
{
    if (ctx == NULL) {
        MEMCTL_LOG("MethodEarlyFreeContext is NULL. Something is wrong. ");
        return;
    }

    /* base */
    if (ctx->base.init_plans) {
        list_free(ctx->base.init_plans);
        ctx->base.init_plans = NIL;
    }

    if (ctx->base.traverse_flag) {
        pfree_ext(ctx->base.traverse_flag);
    }

    /* group */
    ReleaseGroup(ctx->groupTree);

    /* nodegroup */
    if (ctx->ng_distributionList) {
        list_free_deep(ctx->ng_distributionList);
        ctx->ng_distributionList = NIL;
    }

    if (ctx->ng_queryMemKBArray) {
        pfree_ext(ctx->ng_queryMemKBArray);
    }

    /* no need to pfree myself */
}

/*
 * GetHashaggInitializedMem
 *	estimated memory used by hash agg node.
 *
 * Parameters:
 *	@in node: Hash Agg Node
 *
 * Returns: estimated memory.
 */
static int GetHashaggInitializedMem(Agg* node)
{
/* general memory cost of initializing an sonic agg node in KB */
#define SMALL_MEM_SONIC 400
/* general memory cost of initializing an agg node */
#define SMALL_MEM_NORMAL 192
/* estimated memory used by each agg expr */
#define PER_AGG_MEM 150
/* estimated size of each datum in sonic data array */
#define ITEMSIZE 0.009
/* size of each sonic data array */
#define INIT_DATUM_ARRAY_SIZE (16 * 1024)

    int numGroups = node->numCols;
    int estAggNum = node->plan.targetlist->length - numGroups;
    int memSize;
    if (node->is_sonichash) {
        memSize = (int)(SMALL_MEM_SONIC + estAggNum * PER_AGG_MEM +
                        INIT_DATUM_ARRAY_SIZE * (estAggNum + numGroups) * ITEMSIZE);
    } else {
        memSize = SMALL_MEM_NORMAL + estAggNum * PER_AGG_MEM;
    }
    return memSize;
}
